• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5700

04 Dec 2025 09:07AM UTC coverage: 58.619%. First build
#5700

Pull #3970

travis-ci

arnongilboa
Simplify DataImportCron ServiceAccount authorization

Add ServiceAccountName to DataImportCron spec, replacing CreatedBy
which was added in #3946.

In case of DataImportCron with PVC source, the controller checks the
ServiceAccount is authorized to clone the source PVC.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
Pull Request #3970: Simplify DataImportCron ServiceAccount authorization

4 of 11 new or added lines in 1 file covered. (36.36%)

17296 of 29506 relevant lines covered (58.62%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.1
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Below k8s 1.29 there's no way to know the source volume mode
427
                // Let's at least expose this info on our own snapshots
428
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
429
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
430
                        if err != nil {
1✔
431
                                return res, err
×
432
                        }
×
433
                        if volMode != nil {
2✔
434
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
435
                        }
1✔
436
                }
437
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
438
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
439
                if err != nil {
2✔
440
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
441
                                return res, err
×
442
                        }
×
443
                } else {
1✔
444
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
445
                }
1✔
446
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
447
                        return res, err
×
448
                }
×
449
                importSucceeded = true
1✔
450
        default:
1✔
451
                if len(imports) > 0 {
2✔
452
                        imports = imports[1:]
1✔
453
                        dataImportCron.Status.CurrentImports = imports
1✔
454
                }
1✔
455
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
456
        }
457

458
        if importSucceeded {
2✔
459
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
460
                        return res, err
×
461
                }
×
462
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
463
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
464
                        return res, err
×
465
                }
×
466
        }
467

468
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
469
                return res, err
×
470
        }
×
471

472
        // Skip if schedule is disabled
473
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
474
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
475
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
476
                if err != nil {
2✔
477
                        return pollRes, err
1✔
478
                }
1✔
479
                res = pollRes
1✔
480
        }
481

482
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
483
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
484
        if digestUpdated {
2✔
485
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
486
                if dv != nil {
1✔
487
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
488
                                return res, err
×
489
                        }
×
490
                }
491
                if importSucceeded || len(imports) == 0 {
2✔
492
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
493
                                return res, err
1✔
494
                        }
1✔
495
                }
496
        } else if importSucceeded {
2✔
497
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
498
                        return res, err
×
499
                }
×
500
        } else if len(imports) > 0 {
2✔
501
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
502
        } else {
2✔
503
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
504
        }
1✔
505

506
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
507
                return res, err
×
508
        }
×
509

510
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
511
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
512
                        return res, err
×
513
                }
×
514
        }
515
        return res, nil
1✔
516
}
517

518
// Returns the current import DV if exists, and the last imported PVC
519
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
520
        imports := cron.Status.CurrentImports
1✔
521
        if len(imports) == 0 {
2✔
522
                return nil, nil, nil
1✔
523
        }
1✔
524

525
        dvName := imports[0].DataVolumeName
1✔
526
        dv := &cdiv1.DataVolume{}
1✔
527
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
528
                if !k8serrors.IsNotFound(err) {
1✔
529
                        return nil, nil, err
×
530
                }
×
531
                dv = nil
1✔
532
        }
533

534
        pvc := &corev1.PersistentVolumeClaim{}
1✔
535
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
536
                if !k8serrors.IsNotFound(err) {
1✔
537
                        return nil, nil, err
×
538
                }
×
539
                pvc = nil
1✔
540
        }
541
        return dv, pvc, nil
1✔
542
}
543

544
// Returns the current import DV if exists, and the last imported PVC
545
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
546
        imports := cron.Status.CurrentImports
1✔
547
        if len(imports) == 0 {
2✔
548
                return nil, nil
1✔
549
        }
1✔
550

551
        snapName := imports[0].DataVolumeName
1✔
552
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
553
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
554
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
555
                        return nil, err
×
556
                }
×
557
                return nil, nil
1✔
558
        }
559

560
        return snapshot, nil
1✔
561
}
562

563
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
564
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
565
        dataSource := &cdiv1.DataSource{}
1✔
566
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
567
                return nil, err
1✔
568
        }
1✔
569
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
570
                log := r.log.WithName("getCronManagedDataSource")
×
571
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
572
                return nil, ErrNotManagedByCron
×
573
        }
×
574
        return dataSource, nil
1✔
575
}
576

577
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
578
        objCopy := obj.DeepCopyObject()
1✔
579
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
580
        r.setDataImportCronResourceLabels(cron, obj)
1✔
581
        if !reflect.DeepEqual(obj, objCopy) {
2✔
582
                if err := r.client.Update(ctx, obj); err != nil {
1✔
583
                        return err
×
584
                }
×
585
        }
586
        return nil
1✔
587
}
588

589
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
590
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
591
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
592
                if cond.Status == corev1.ConditionFalse &&
×
593
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
594
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
595
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
596
                        dv.Labels[common.DataImportCronLabel] = ""
×
597
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
598
                                return err
×
599
                        }
×
600
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
601
                                return err
×
602
                        }
×
603
                        cron.Status.CurrentImports = nil
×
604
                }
605
        }
606
        return nil
×
607
}
608

609
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
610
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
611
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
612
        if err != nil {
1✔
613
                return err
×
614
        }
×
615
        if regSource.ImageStream == nil {
1✔
616
                return nil
×
617
        }
×
618
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
619
        if err != nil {
2✔
620
                return err
1✔
621
        }
1✔
622
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
623
        if err != nil {
2✔
624
                return err
1✔
625
        }
1✔
626
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
627
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
628
                log.Info("Updating DataImportCron", "digest", digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
630
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
631
        }
1✔
632
        return nil
1✔
633
}
634

635
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
636
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
637
        podName := getPollerPodName(cron)
1✔
638
        ns := cron.Namespace
1✔
639
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
640
        pod := &corev1.Pod{}
1✔
641

1✔
642
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
643
                digest, err := fetchContainerImageDigest(pod)
1✔
644
                if err != nil || digest == "" {
1✔
645
                        return false, err
×
646
                }
×
647
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
648
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
649
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
650
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
651
                }
1✔
652
                return true, r.client.Delete(ctx, pod)
1✔
653
        } else if cc.IgnoreNotFound(err) != nil {
1✔
654
                return false, err
×
655
        }
×
656

657
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
658
        if err != nil {
1✔
659
                return false, err
×
660
        }
×
661
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
662
        if platform != nil && platform.Architecture != "" {
1✔
663
                if workloadNodePlacement.NodeSelector == nil {
×
664
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
665
                }
×
666
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
667
        }
668

669
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
670

1✔
671
        pod = &corev1.Pod{
1✔
672
                ObjectMeta: metav1.ObjectMeta{
1✔
673
                        Name:      podName,
1✔
674
                        Namespace: ns,
1✔
675
                        OwnerReferences: []metav1.OwnerReference{
1✔
676
                                {
1✔
677
                                        APIVersion:         cron.APIVersion,
1✔
678
                                        Kind:               cron.Kind,
1✔
679
                                        Name:               cron.Name,
1✔
680
                                        UID:                cron.UID,
1✔
681
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
682
                                        Controller:         ptr.To[bool](true),
1✔
683
                                },
1✔
684
                        },
1✔
685
                },
1✔
686
                Spec: corev1.PodSpec{
1✔
687
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
688
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
689
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
690
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
691
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
692
                        Volumes: []corev1.Volume{
1✔
693
                                {
1✔
694
                                        Name: "shared-volume",
1✔
695
                                        VolumeSource: corev1.VolumeSource{
1✔
696
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
697
                                        },
1✔
698
                                },
1✔
699
                        },
1✔
700
                        InitContainers: []corev1.Container{
1✔
701
                                {
1✔
702
                                        Name:                     "init",
1✔
703
                                        Image:                    r.image,
1✔
704
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
705
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
706
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
707
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
708
                                },
1✔
709
                        },
1✔
710
                        Containers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "image-container",
1✔
713
                                        Image:                    containerImage,
1✔
714
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
715
                                        Command:                  []string{"/shared/server", "-h"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                },
1✔
721
        }
1✔
722

1✔
723
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
724
        if pod.Spec.SecurityContext != nil {
2✔
725
                pod.Spec.SecurityContext.FSGroup = nil
1✔
726
        }
1✔
727

728
        return false, r.client.Create(ctx, pod)
1✔
729
}
730

731
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
732
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
733
                return "", nil
×
734
        }
×
735

736
        status := pod.Status.ContainerStatuses[0]
1✔
737
        if status.State.Waiting != nil {
1✔
738
                reason := status.State.Waiting.Reason
×
739
                switch reason {
×
740
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
741
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
742
                }
743
                return "", nil
×
744
        }
745

746
        if status.State.Terminated == nil {
1✔
747
                return "", nil
×
748
        }
×
749

750
        imageID := status.ImageID
1✔
751
        if imageID == "" {
1✔
752
                return "", errors.Errorf("Container has no imageID")
×
753
        }
×
754
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
755
        if idx < 0 {
1✔
756
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
757
        }
×
758

759
        return imageID[idx:], nil
1✔
760
}
761

762
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
763
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
764
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
765
        if err != nil {
1✔
766
                return err
×
767
        }
×
768
        ns := pvcSource.Namespace
1✔
769
        if ns == "" {
2✔
770
                ns = dataImportCron.Namespace
1✔
771
        }
1✔
772
        pvc := &corev1.PersistentVolumeClaim{}
1✔
773
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
774
                return err
1✔
775
        }
1✔
776
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
777
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
778
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
779
                log.Info("Updating DataImportCron", "digest", digest)
1✔
780
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
781
        }
1✔
782
        return nil
1✔
783
}
784

785
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
786
        log := r.log.WithName("updateDataSource")
1✔
787
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
788
        if err != nil {
2✔
789
                if k8serrors.IsNotFound(err) {
2✔
790
                        dataSource = r.newDataSource(dataImportCron)
1✔
791
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
792
                                return err
×
793
                        }
×
794
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
795
                } else if errors.Is(err, ErrNotManagedByCron) {
×
796
                        return nil
×
797
                } else {
×
798
                        return err
×
799
                }
×
800
        }
801
        dataSourceCopy := dataSource.DeepCopy()
1✔
802
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
803

1✔
804
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
805
        populateDataSource(format, dataSource, sourcePVC)
1✔
806

1✔
807
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
808
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
809
                        return err
×
810
                }
×
811
        }
812

813
        return nil
1✔
814
}
815

816
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
817
        if sourcePVC == nil {
2✔
818
                return
1✔
819
        }
1✔
820

821
        switch format {
1✔
822
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
823
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
824
                        PVC: sourcePVC,
1✔
825
                }
1✔
826
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
827
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
828
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
829
                                Namespace: sourcePVC.Namespace,
1✔
830
                                Name:      sourcePVC.Name,
1✔
831
                        },
1✔
832
                }
1✔
833
        }
834
}
835

836
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
837
        if dataImportCron.Status.CurrentImports == nil {
1✔
838
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
839
        }
×
840
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
841
                Namespace: dataImportCron.Namespace,
1✔
842
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
843
        }
1✔
844
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
845
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
846
                now := metav1.Now()
1✔
847
                dataImportCron.Status.LastImportTimestamp = &now
1✔
848
        }
1✔
849
        return nil
1✔
850
}
851

852
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
853
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
854
        if lastTimeStr == "" {
2✔
855
                return nil
1✔
856
        }
1✔
857
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
858
        if err != nil {
1✔
859
                return err
×
860
        }
×
861
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
862
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
863
        }
1✔
864
        return nil
1✔
865
}
866

867
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
868
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
869
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
870
        if digest == "" {
1✔
871
                return nil
×
872
        }
×
873
        dvName, err := createDvName(dataSourceName, digest)
1✔
874
        if err != nil {
2✔
875
                return err
1✔
876
        }
1✔
877
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
878

1✔
879
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
880
        for _, src := range sources {
2✔
881
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
882
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
883
                                return err
×
884
                        }
×
885
                } else {
1✔
886
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
887
                                return err
×
888
                        }
×
889
                        // If source exists don't create DV
890
                        return nil
1✔
891
                }
892
        }
893

894
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
895
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
896
                return err
×
897
        } else if !allowed {
1✔
898
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
×
899
                        "Not authorized to create DataVolume", notAuthorized)
×
900
                return nil
×
901
        }
×
902
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
903
                return err
×
904
        }
×
905

906
        return nil
1✔
907
}
908

909
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
910
        if !isPvcSource(dataImportCron) {
2✔
911
                return true, nil
1✔
912
        }
1✔
913
        saName := dataImportCron.Spec.ServiceAccountName
1✔
914
        if saName == "" {
2✔
915
                saName = "default"
1✔
916
        }
1✔
917
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, r, dataImportCron.Namespace, saName); err != nil {
1✔
NEW
918
                return false, err
×
919
        } else if !resp.Allowed {
1✔
NEW
920
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
×
921
                return false, nil
×
922
        }
×
923

924
        return true, nil
1✔
925
}
926

927
func (r *DataImportCronReconciler) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
×
NEW
928
        if err := r.client.Create(context.TODO(), sar); err != nil {
×
NEW
929
                return nil, err
×
NEW
930
        }
×
NEW
931
        return sar, nil
×
932
}
933

934
func (r *DataImportCronReconciler) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
935
        ns := &corev1.Namespace{}
1✔
936
        if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
937
                return nil, err
×
938
        }
×
939
        return ns, nil
1✔
940
}
941

942
func (r *DataImportCronReconciler) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
943
        das := &cdiv1.DataSource{}
×
944
        if err := r.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
945
                return nil, err
×
946
        }
×
NEW
947
        return das, nil
×
948
}
949

950
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
951
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
952
        if !ok {
1✔
953
                // nothing to delete
×
954
                return nil
×
955
        }
×
956
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
957
        if err != nil {
1✔
958
                return err
×
959
        }
×
960
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
961
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
962
        for _, src := range sources {
2✔
963
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
964
                        return err
×
965
                }
×
966
        }
967
        for _, src := range sources {
2✔
968
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
969
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
970
                }
×
971
        }
972
        // Only update desired storage class once garbage collection went through
973
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
974
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
975
        if err != nil {
1✔
976
                return err
×
977
        }
×
978

979
        return nil
1✔
980
}
981

982
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
983
        switch format {
1✔
984
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
985
                return nil
1✔
986
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
987
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
988
        default:
×
989
                return fmt.Errorf("unknown source format for snapshot")
×
990
        }
991
}
992

993
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
994
        if pvc == nil {
1✔
995
                return nil
×
996
        }
×
997
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
998
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
999
                return nil
1✔
1000
        }
1✔
1001
        storageProfile := &cdiv1.StorageProfile{}
1✔
1002
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1003
                return err
×
1004
        }
×
1005
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1006
        if err != nil {
1✔
1007
                return err
×
1008
        }
×
1009
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1010
                ObjectMeta: metav1.ObjectMeta{
1✔
1011
                        Name:      pvc.Name,
1✔
1012
                        Namespace: dataImportCron.Namespace,
1✔
1013
                        Labels: map[string]string{
1✔
1014
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1015
                                common.CDIComponentLabel: "",
1✔
1016
                        },
1✔
1017
                },
1✔
1018
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1019
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1020
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1021
                        },
1✔
1022
                        VolumeSnapshotClassName: &className,
1✔
1023
                },
1✔
1024
        }
1✔
1025
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1026
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1027

1✔
1028
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1029
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1030
                if !k8serrors.IsNotFound(err) {
1✔
1031
                        return err
×
1032
                }
×
1033
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1034
                if pvc.Spec.VolumeMode != nil {
2✔
1035
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1036
                }
1✔
1037
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1038
                        return err
×
1039
                }
×
1040
        } else {
1✔
1041
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1042
                        // Clean up DV/PVC as they are not needed anymore
1✔
1043
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1044
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1045
                                return err
×
1046
                        }
×
1047
                }
1048
        }
1049

1050
        return nil
1✔
1051
}
1052

1053
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1054
        dataImportCron.Status.SourceFormat = &format
1✔
1055

1✔
1056
        switch format {
1✔
1057
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1058
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1059
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1060
                if snapshot == nil {
2✔
1061
                        // Snapshot create/update will trigger reconcile
1✔
1062
                        return nil
1✔
1063
                }
1✔
1064
                if cc.IsSnapshotReady(snapshot) {
2✔
1065
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1066
                } else {
2✔
1067
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1068
                }
1✔
1069
        default:
×
1070
                return fmt.Errorf("unknown source format for snapshot")
×
1071
        }
1072

1073
        return nil
1✔
1074
}
1075

1076
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1077
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1078
        if desiredStorageClass == nil {
2✔
1079
                return format, nil
1✔
1080
        }
1✔
1081

1082
        storageProfile := &cdiv1.StorageProfile{}
1✔
1083
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1084
                return format, err
×
1085
        }
×
1086
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1087
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1088
        }
1✔
1089

1090
        return format, nil
1✔
1091
}
1092

1093
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1094
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1095
                return nil
×
1096
        }
×
1097
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1098
        if err != nil {
1✔
1099
                return err
×
1100
        }
×
1101

1102
        maxImports := defaultImportsToKeepPerCron
1✔
1103

1✔
1104
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1105
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1106
        }
1✔
1107

1108
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1109
                return err
×
1110
        }
×
1111
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1112
                return err
×
1113
        }
×
1114

1115
        return nil
1✔
1116
}
1117

1118
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1119
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1120

1✔
1121
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1122
                return err
×
1123
        }
×
1124
        if len(pvcList.Items) > maxImports {
2✔
1125
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1126
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1127
                })
1✔
1128
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1129
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1130
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1131
                                return err
×
1132
                        }
×
1133
                }
1134
        }
1135

1136
        dvList := &cdiv1.DataVolumeList{}
1✔
1137
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1138
                return err
×
1139
        }
×
1140

1141
        if len(dvList.Items) > maxImports {
2✔
1142
                for _, dv := range dvList.Items {
2✔
1143
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1144
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1145
                                return err
×
1146
                        }
×
1147

1148
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1149
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1150
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1151
                                        return err
×
1152
                                }
×
1153
                        }
1154
                }
1155
        }
1156

1157
        return nil
1✔
1158
}
1159

1160
// deleteDvPvc deletes DV or PVC if DV was GCed
1161
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1162
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1163
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1164
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1165
                return err
1✔
1166
        }
1✔
1167
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1168
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1169
                return err
×
1170
        }
×
1171
        return nil
1✔
1172
}
1173

1174
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1175
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1176

1✔
1177
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1178
                if meta.IsNoMatchError(err) {
×
1179
                        return nil
×
1180
                }
×
1181
                return err
×
1182
        }
1183
        if len(snapList.Items) > maxImports {
1✔
1184
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1185
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1186
                })
×
1187
                for _, snap := range snapList.Items[maxImports:] {
×
1188
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1189
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1190
                                return err
×
1191
                        }
×
1192
                }
1193
        }
1194

1195
        return nil
1✔
1196
}
1197

1198
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1199
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1200
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1201

1✔
1202
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1203
                return err
×
1204
        }
×
1205
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1206
        if err != nil {
1✔
1207
                return err
×
1208
        }
×
1209
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1210
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1211
                return err
×
1212
        }
×
1213
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1214
                return err
×
1215
        }
×
1216
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1217
                return err
×
1218
        }
×
1219
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1220
                return err
×
1221
        }
×
1222
        return nil
1✔
1223
}
1224

1225
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1226
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1227
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1228
        if err != nil {
1✔
1229
                return err
×
1230
        }
×
1231
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1232
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1233
                return err
×
1234
        }
×
1235
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1236
                return err
×
1237
        }
×
1238

1239
        return nil
1✔
1240
}
1241

1242
// NewDataImportCronController creates a new instance of the DataImportCron controller
1243
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1244
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1245
                Scheme: mgr.GetScheme(),
×
1246
                Mapper: mgr.GetRESTMapper(),
×
1247
        })
×
1248
        if err != nil {
×
1249
                return nil, err
×
1250
        }
×
1251
        reconciler := &DataImportCronReconciler{
×
1252
                client:          mgr.GetClient(),
×
1253
                uncachedClient:  uncachedClient,
×
1254
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1255
                scheme:          mgr.GetScheme(),
×
1256
                log:             log.WithName(dataImportControllerName),
×
1257
                image:           importerImage,
×
1258
                pullPolicy:      pullPolicy,
×
1259
                cdiNamespace:    util.GetNamespace(),
×
1260
                installerLabels: installerLabels,
×
1261
        }
×
1262
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1263
                MaxConcurrentReconciles: 3,
×
1264
                Reconciler:              reconciler,
×
1265
        })
×
1266
        if err != nil {
×
1267
                return nil, err
×
1268
        }
×
1269
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1270
                return nil, err
×
1271
        }
×
1272
        log.Info("Initialized DataImportCron controller")
×
1273
        return dataImportCronController, nil
×
1274
}
1275

1276
func getCronName(obj client.Object) string {
×
1277
        return obj.GetLabels()[common.DataImportCronLabel]
×
1278
}
×
1279

1280
func getCronNs(obj client.Object) string {
×
1281
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1282
}
×
1283

1284
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1285
        if cronName := getCronName(obj); cronName != "" {
×
1286
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1287
        }
×
1288
        return nil
×
1289
}
1290

1291
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1292
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1293
                return err
×
1294
        }
×
1295

1296
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1297
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1298
                // Otherwise we risk losing the storage profile event
×
1299
                var crons cdiv1.DataImportCronList
×
1300
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1301
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1302
                        return nil
×
1303
                }
×
1304
                // Storage profiles are 1:1 to storage classes
1305
                scName := obj.GetName()
×
1306
                var reqs []reconcile.Request
×
1307
                for _, cron := range crons.Items {
×
1308
                        dataVolume := cron.Spec.Template
×
1309
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1310
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1311
                        if err != nil || templateSc == nil {
×
1312
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1313
                                return reqs
×
1314
                        }
×
1315
                        if templateSc.Name == scName {
×
1316
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1317
                        }
×
1318
                }
1319
                return reqs
×
1320
        }
1321

1322
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1323
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1324
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1325
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1326
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1327
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1328
                },
1329
        )); err != nil {
×
1330
                return err
×
1331
        }
×
1332

1333
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1334
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1335
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1336
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1337
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1338
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1339
                },
1340
        )); err != nil {
×
1341
                return err
×
1342
        }
×
1343

1344
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1345
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1346
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1347
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1348
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1349
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1350
                },
1351
        )); err != nil {
×
1352
                return err
×
1353
        }
×
1354

1355
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1356
                return err
×
1357
        }
×
1358

1359
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1360
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1361
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1362
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1363
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1364
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1365
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1366
                        },
×
1367
                },
1368
        )); err != nil {
×
1369
                return err
×
1370
        }
×
1371

1372
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1373
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1374
        }
×
1375

1376
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1377
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1378
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1379
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1380
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1381
                        },
×
1382
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1383
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1384
                },
1385
        )); err != nil {
×
1386
                return err
×
1387
        }
×
1388

1389
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1390
                if meta.IsNoMatchError(err) {
×
1391
                        // Back out if there's no point to attempt watch
×
1392
                        return nil
×
1393
                }
×
1394
                if !cc.IsErrCacheNotStarted(err) {
×
1395
                        return err
×
1396
                }
×
1397
        }
1398
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1399
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1400
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1401
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1402
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1403
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1404
                },
1405
        )); err != nil {
×
1406
                return err
×
1407
        }
×
1408

1409
        return nil
×
1410
}
1411

1412
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1413
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1414
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1415
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1416
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1417
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1418
                                log.Info("Update", "sc", obj.GetName(),
×
1419
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1420
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1421
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1422
                                if err != nil {
×
1423
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1424
                                }
×
1425
                                return reqs
×
1426
                        },
1427
                ),
1428
                predicate.TypedFuncs[*storagev1.StorageClass]{
1429
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1430
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1431
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1432
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1433
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1434
                        },
×
1435
                },
1436
        )); err != nil {
×
1437
                return err
×
1438
        }
×
1439

1440
        return nil
×
1441
}
1442

1443
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1444
        dicList := &cdiv1.DataImportCronList{}
×
1445
        if err := c.List(ctx, dicList); err != nil {
×
1446
                return nil, err
×
1447
        }
×
1448
        reqs := []reconcile.Request{}
×
1449
        for _, dic := range dicList.Items {
×
1450
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1451
                        continue
×
1452
                }
1453

1454
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1455
        }
1456

1457
        return reqs, nil
×
1458
}
1459

1460
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1461
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1462
                return false, nil
1✔
1463
        }
1✔
1464

1465
        sc := pvc.Spec.StorageClassName
1✔
1466
        if sc == nil || *sc == desiredStorageClass {
2✔
1467
                return false, nil
1✔
1468
        }
1✔
1469

1470
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1471
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1472
                return false, err
×
1473
        }
×
1474

1475
        return true, nil
1✔
1476
}
1477

1478
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1479
        cronJob := &batchv1.CronJob{}
1✔
1480
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1481
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1482
                return false, cc.IgnoreNotFound(err)
1✔
1483
        }
1✔
1484

1485
        cronJobCopy := cronJob.DeepCopy()
1✔
1486
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1487
                return false, err
×
1488
        }
×
1489

1490
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1491
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1492
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1493
                        return false, cc.IgnoreNotFound(err)
×
1494
                }
×
1495
        }
1496
        return true, nil
1✔
1497
}
1498

1499
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1500
        cronJob := &batchv1.CronJob{
1✔
1501
                ObjectMeta: metav1.ObjectMeta{
1✔
1502
                        Name:      GetCronJobName(cron),
1✔
1503
                        Namespace: r.cdiNamespace,
1✔
1504
                },
1✔
1505
        }
1✔
1506
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1507
                return nil, err
×
1508
        }
×
1509
        return cronJob, nil
1✔
1510
}
1511

1512
// InitPollerPod inits poller Pod
1513
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1514
        regSource, err := getCronRegistrySource(cron)
1✔
1515
        if err != nil {
1✔
1516
                return err
×
1517
        }
×
1518
        if regSource.URL == nil {
1✔
1519
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1520
        }
×
1521
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1522
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1523
                return err
×
1524
        }
×
1525
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1526
        if err != nil {
1✔
1527
                return err
×
1528
        }
×
1529
        container := corev1.Container{
1✔
1530
                Name:  "cdi-source-update-poller",
1✔
1531
                Image: image,
1✔
1532
                Command: []string{
1✔
1533
                        "/usr/bin/cdi-source-update-poller",
1✔
1534
                        "-ns", cron.Namespace,
1✔
1535
                        "-cron", cron.Name,
1✔
1536
                        "-url", *regSource.URL,
1✔
1537
                },
1✔
1538
                ImagePullPolicy:          pullPolicy,
1✔
1539
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1540
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1541
        }
1✔
1542

1✔
1543
        var volumes []corev1.Volume
1✔
1544
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1545
        if hasCertConfigMap {
1✔
1546
                vm := corev1.VolumeMount{
×
1547
                        Name:      CertVolName,
×
1548
                        MountPath: common.ImporterCertDir,
×
1549
                }
×
1550
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1551
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1552
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1553
        }
×
1554

1555
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1556
                vm := corev1.VolumeMount{
1✔
1557
                        Name:      ProxyCertVolName,
1✔
1558
                        MountPath: common.ImporterProxyCertDir,
1✔
1559
                }
1✔
1560
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1561
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1562
        }
1✔
1563

1564
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1565
                container.Env = append(container.Env,
×
1566
                        corev1.EnvVar{
×
1567
                                Name: common.ImporterAccessKeyID,
×
1568
                                ValueFrom: &corev1.EnvVarSource{
×
1569
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1570
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1571
                                                        Name: *regSource.SecretRef,
×
1572
                                                },
×
1573
                                                Key: common.KeyAccess,
×
1574
                                        },
×
1575
                                },
×
1576
                        },
×
1577
                        corev1.EnvVar{
×
1578
                                Name: common.ImporterSecretKey,
×
1579
                                ValueFrom: &corev1.EnvVarSource{
×
1580
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1581
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1582
                                                        Name: *regSource.SecretRef,
×
1583
                                                },
×
1584
                                                Key: common.KeySecret,
×
1585
                                        },
×
1586
                                },
×
1587
                        },
×
1588
                )
×
1589
        }
×
1590

1591
        addEnvVar := func(varName, value string) {
2✔
1592
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1593
        }
1✔
1594

1595
        if insecureTLS {
1✔
1596
                addEnvVar(common.InsecureTLSVar, "true")
×
1597
        }
×
1598

1599
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1600
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1601
                        addEnvVar(varName, value)
1✔
1602
                }
1✔
1603
        }
1604

1605
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1606
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1607
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1608

1✔
1609
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1610
        if err != nil {
1✔
1611
                return err
×
1612
        }
×
1613
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1614
        if err != nil {
1✔
1615
                return err
×
1616
        }
×
1617

1618
        podSpec := &pod.Spec
1✔
1619

1✔
1620
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1621
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1622
        podSpec.Containers = []corev1.Container{container}
1✔
1623
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1624
        podSpec.Volumes = volumes
1✔
1625
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1626
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1627
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1628
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1629

1✔
1630
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1631
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1632
        if podSpec.SecurityContext != nil {
2✔
1633
                podSpec.SecurityContext.FSGroup = nil
1✔
1634
        }
1✔
1635
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1636
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1637
        }
1✔
1638

1639
        if pod.Labels == nil {
2✔
1640
                pod.Labels = map[string]string{}
1✔
1641
        }
1✔
1642
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1643

1✔
1644
        return nil
1✔
1645
}
1646

1647
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1648
        cronJobSpec := &cronJob.Spec
1✔
1649
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1650
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1651
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1652
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1653

1✔
1654
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1655
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1656
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1657
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1658

1✔
1659
        pod := &jobSpec.Template
1✔
1660
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1661
                return err
×
1662
        }
×
1663
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1664
                return err
×
1665
        }
×
1666
        return nil
1✔
1667
}
1668

1669
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1670
        job := &batchv1.Job{
1✔
1671
                ObjectMeta: metav1.ObjectMeta{
1✔
1672
                        Name:      GetInitialJobName(cron),
1✔
1673
                        Namespace: cronJob.Namespace,
1✔
1674
                },
1✔
1675
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1676
        }
1✔
1677
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1678
                return nil, err
×
1679
        }
×
1680
        return job, nil
1✔
1681
}
1682

1683
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1684
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1685
                return err
×
1686
        }
×
1687
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1688
        labels := obj.GetLabels()
1✔
1689
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1690
        labels[common.DataImportCronLabel] = cron.Name
1✔
1691
        obj.SetLabels(labels)
1✔
1692
        return nil
1✔
1693
}
1694

1695
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1696
        dv := cron.Spec.Template.DeepCopy()
1✔
1697
        if isCronRegistrySource(cron) {
2✔
1698
                var digestedURL string
1✔
1699
                if isURLSource(cron) {
2✔
1700
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1701
                } else if isImageStreamSource(cron) {
3✔
1702
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1703
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1704
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1705
                }
1✔
1706
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1707
        }
1708
        dv.Name = dataVolumeName
1✔
1709
        dv.Namespace = cron.Namespace
1✔
1710
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1711
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1712
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1713
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1714

1✔
1715
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1716
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1717
        }
1✔
1718

1719
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1720

1✔
1721
        return dv
1✔
1722
}
1723

1724
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1725
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1726
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1727
        labels := obj.GetLabels()
1✔
1728
        labels[common.DataImportCronLabel] = cron.Name
1✔
1729
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1730
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1731
        }
1✔
1732
        obj.SetLabels(labels)
1✔
1733
}
1734

1735
func untagDigestedDockerURL(dockerURL string) string {
1✔
1736
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1737
                url := u.Host + u.Path
1✔
1738
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1739
                // Check for tag
1✔
1740
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1741
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1742
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1743
                        }
1✔
1744
                }
1745
        }
1746
        return dockerURL
1✔
1747
}
1748

1749
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1750
        if val := cron.Labels[ann]; val != "" {
2✔
1751
                cc.AddLabel(dv, ann, val)
1✔
1752
        }
1✔
1753
}
1754

1755
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1756
        if val := cron.Annotations[ann]; val != "" {
1✔
1757
                cc.AddAnnotation(dv, ann, val)
×
1758
        }
×
1759
}
1760

1761
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1762
        dataSource := &cdiv1.DataSource{
1✔
1763
                ObjectMeta: metav1.ObjectMeta{
1✔
1764
                        Name:      cron.Spec.ManagedDataSource,
1✔
1765
                        Namespace: cron.Namespace,
1✔
1766
                },
1✔
1767
        }
1✔
1768
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1769
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1770
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1771
        return dataSource
1✔
1772
}
1✔
1773

1774
// Create DataVolume name based on the DataSource name + prefix of the digest
1775
func createDvName(prefix, digest string) (string, error) {
1✔
1776
        digestPrefix := ""
1✔
1777
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1778
                digestPrefix = digestSha256Prefix
1✔
1779
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1780
                digestPrefix = digestUIDPrefix
1✔
1781
        } else {
2✔
1782
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1783
        }
1✔
1784
        fromIdx := len(digestPrefix)
1✔
1785
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1786
        if len(digest) < toIdx {
2✔
1787
                return "", errors.Errorf("Digest is too short")
1✔
1788
        }
1✔
1789
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1790
}
1791

1792
// GetCronJobName get CronJob name based on cron name and UID
1793
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1794
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1795
}
1✔
1796

1797
// GetInitialJobName get initial job name based on cron name and UID
1798
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1799
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1800
}
1✔
1801

1802
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1803
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1804
}
1✔
1805

1806
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1807
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1808
}
1✔
1809

1810
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1811
        dv := &cron.Spec.Template
1✔
1812

1✔
1813
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1814
                return explicitVolumeMode, nil
×
1815
        }
×
1816

1817
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1818
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1819
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1820
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1821
                        AccessModes:      accessModes,
1✔
1822
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1823
                        Resources: corev1.VolumeResourceRequirements{
1✔
1824
                                Requests: corev1.ResourceList{
1✔
1825
                                        // Doesn't matter
1✔
1826
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1827
                                },
1✔
1828
                        },
1✔
1829
                },
1✔
1830
        }
1✔
1831
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1832
                return nil, err
×
1833
        }
×
1834

1835
        return inferredPvc.Spec.VolumeMode, nil
1✔
1836
}
1837

1838
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1839
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1840
        if dv.Spec.PVC != nil {
1✔
1841
                return dv.Spec.PVC.VolumeMode
×
1842
        }
×
1843

1844
        if dv.Spec.Storage != nil {
2✔
1845
                return dv.Spec.Storage.VolumeMode
1✔
1846
        }
1✔
1847

1848
        return nil
×
1849
}
1850

1851
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1852
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1853
        if dv.Spec.PVC != nil {
1✔
1854
                return dv.Spec.PVC.AccessModes
×
1855
        }
×
1856

1857
        if dv.Spec.Storage != nil {
2✔
1858
                return dv.Spec.Storage.AccessModes
1✔
1859
        }
1✔
1860

1861
        return nil
×
1862
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc