• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5703

08 Dec 2025 06:25PM UTC coverage: 58.647% (-0.005%) from 58.652%
#5703

push

travis-ci

web-flow
Simplify DataImportCron ServiceAccount authorization (#3970)

* Revert "Authorize DataImportCron PVC clone based on creator UserInfo (#3946)"

This reverts commit b578f2a82.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

* Simplify DataImportCron ServiceAccount authorization

Add ServiceAccountName to DataImportCron spec, replacing CreatedBy
which was added in #3946.

In case of DataImportCron with PVC source, the controller checks the
ServiceAccount is authorized to clone the source PVC.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

---------

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>

10 of 13 new or added lines in 1 file covered. (76.92%)

5 existing lines in 2 files now uncovered.

17305 of 29507 relevant lines covered (58.65%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.68
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        secv1 "github.com/openshift/api/security/v1"
33
        "github.com/pkg/errors"
34
        cronexpr "github.com/robfig/cron/v3"
35

36
        authorizationv1 "k8s.io/api/authorization/v1"
37
        batchv1 "k8s.io/api/batch/v1"
38
        corev1 "k8s.io/api/core/v1"
39
        storagev1 "k8s.io/api/storage/v1"
40
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
41
        "k8s.io/apimachinery/pkg/api/meta"
42
        "k8s.io/apimachinery/pkg/api/resource"
43
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44
        "k8s.io/apimachinery/pkg/labels"
45
        "k8s.io/apimachinery/pkg/runtime"
46
        "k8s.io/apimachinery/pkg/types"
47
        "k8s.io/client-go/tools/record"
48
        openapicommon "k8s.io/kube-openapi/pkg/common"
49
        "k8s.io/utils/ptr"
50

51
        "sigs.k8s.io/controller-runtime/pkg/client"
52
        "sigs.k8s.io/controller-runtime/pkg/controller"
53
        "sigs.k8s.io/controller-runtime/pkg/event"
54
        "sigs.k8s.io/controller-runtime/pkg/handler"
55
        "sigs.k8s.io/controller-runtime/pkg/manager"
56
        "sigs.k8s.io/controller-runtime/pkg/predicate"
57
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
58
        "sigs.k8s.io/controller-runtime/pkg/source"
59

60
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
61
        "kubevirt.io/containerized-data-importer/pkg/common"
62
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
63
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
64
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
65
        "kubevirt.io/containerized-data-importer/pkg/operator"
66
        "kubevirt.io/containerized-data-importer/pkg/util"
67
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
68
)
69

70
const (
71
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
72
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
73
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
74
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
75
)
76

77
// DataImportCronReconciler members
78
type DataImportCronReconciler struct {
79
        client          client.Client
80
        uncachedClient  client.Client
81
        recorder        record.EventRecorder
82
        scheme          *runtime.Scheme
83
        log             logr.Logger
84
        image           string
85
        pullPolicy      string
86
        cdiNamespace    string
87
        installerLabels map[string]string
88
}
89

90
const (
91
        // AnnSourceDesiredDigest is the digest of the pending updated image
92
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
93
        // AnnImageStreamDockerRef is the ImageStream Docker reference
94
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
95
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
96
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
97
        // AnnLastCronTime is the cron last execution time stamp
98
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
99
        // AnnLastUseTime is the PVC last use time stamp
100
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
101
        // AnnStorageClass is the cron DV's storage class
102
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
103

104
        dataImportControllerName    = "dataimportcron-controller"
105
        digestSha256Prefix          = "sha256:"
106
        digestUIDPrefix             = "uid:"
107
        digestDvNameSuffixLength    = 12
108
        cronJobUIDSuffixLength      = 8
109
        defaultImportsToKeepPerCron = 3
110
)
111

112
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
113

114
// Reconcile loop for DataImportCronReconciler
115
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
116
        dataImportCron := &cdiv1.DataImportCron{}
1✔
117
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
118
                return reconcile.Result{}, err
1✔
119
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
120
                err := r.cleanup(ctx, req.NamespacedName)
1✔
121
                return reconcile.Result{}, err
1✔
122
        }
1✔
123
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
124
        if !shouldReconcile || err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
129
                return reconcile.Result{}, err
×
130
        }
×
131

132
        return r.update(ctx, dataImportCron)
1✔
133
}
134

135
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
136
        dataSource := &cdiv1.DataSource{}
1✔
137
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
138
                if k8serrors.IsNotFound(err) {
2✔
139
                        return true, nil
1✔
140
                }
1✔
141
                return false, err
×
142
        }
143
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
144
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
145
                return true, nil
1✔
146
        }
1✔
147
        otherCron := &cdiv1.DataImportCron{}
1✔
148
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
149
                if k8serrors.IsNotFound(err) {
2✔
150
                        return true, nil
1✔
151
                }
1✔
152
                return false, err
×
153
        }
154
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
155
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
156
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
157
                r.log.V(3).Info(msg)
1✔
158
                return false, nil
1✔
159
        }
1✔
160
        return true, nil
×
161
}
162

163
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
164
        if dataImportCron.Spec.Schedule == "" {
2✔
165
                return nil
1✔
166
        }
1✔
167
        if isControllerPolledSource(dataImportCron) {
2✔
168
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
169
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
170
                }
1✔
171
                return nil
1✔
172
        }
173
        if !isURLSource(dataImportCron) {
1✔
174
                return nil
×
175
        }
×
176
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
177
        if exists || err != nil {
2✔
178
                return err
1✔
179
        }
1✔
180
        cronJob, err := r.newCronJob(dataImportCron)
1✔
181
        if err != nil {
1✔
182
                return err
×
183
        }
×
184
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
185
                return err
×
186
        }
×
187
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
188
        if err != nil {
1✔
189
                return err
×
190
        }
×
191
        if err := r.client.Create(ctx, job); err != nil {
1✔
192
                return err
×
193
        }
×
194
        return nil
1✔
195
}
196

197
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
198
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
199
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
200
        }
×
201
        imageStream := &imagev1.ImageStream{}
1✔
202
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
203
        if err != nil {
2✔
204
                return nil, "", err
1✔
205
        }
1✔
206
        imageStreamNamespacedName := types.NamespacedName{
1✔
207
                Namespace: imageStreamNamespace,
1✔
208
                Name:      name,
1✔
209
        }
1✔
210
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
211
                return nil, "", err
1✔
212
        }
1✔
213
        return imageStream, tag, nil
1✔
214
}
215

216
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
217
        if imageStream == nil {
1✔
218
                return "", "", errors.Errorf("No ImageStream")
×
219
        }
×
220
        tags := imageStream.Status.Tags
1✔
221
        if len(tags) == 0 {
1✔
222
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
223
        }
×
224

225
        tagIdx := 0
1✔
226
        if len(imageStreamTag) > 0 {
2✔
227
                tagIdx = -1
1✔
228
                for i, tag := range tags {
2✔
229
                        if tag.Tag == imageStreamTag {
2✔
230
                                tagIdx = i
1✔
231
                                break
1✔
232
                        }
233
                }
234
        }
235
        if tagIdx == -1 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238

239
        if len(tags[tagIdx].Items) == 0 {
2✔
240
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
241
        }
1✔
242
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
243
}
244

245
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
246
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
247
                return imageStreamName, "", nil
1✔
248
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
249
                return subs[0], subs[1], nil
1✔
250
        }
1✔
251
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
252
}
253

254
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
255
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
256
        if nextTimeStr == "" {
1✔
257
                return r.setNextCronTime(dataImportCron)
×
258
        }
×
259
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
260
        if err != nil {
1✔
261
                return reconcile.Result{}, err
×
262
        }
×
263
        if nextTime.After(time.Now()) {
2✔
264
                return r.setNextCronTime(dataImportCron)
1✔
265
        }
1✔
266
        switch {
1✔
267
        case isImageStreamSource(dataImportCron):
1✔
268
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
269
                        return reconcile.Result{}, err
1✔
270
                }
1✔
271
        case isPvcSource(dataImportCron):
1✔
272
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
273
                        return reconcile.Result{}, err
1✔
274
                }
1✔
275
        case isNodePull(dataImportCron):
1✔
276
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
277
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
278
                } else if err != nil {
2✔
279
                        return reconcile.Result{}, err
×
280
                }
×
281
        }
282

283
        return r.setNextCronTime(dataImportCron)
1✔
284
}
285

286
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
287
        now := time.Now()
1✔
288
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
289
        if err != nil {
1✔
290
                return reconcile.Result{}, err
×
291
        }
×
292
        nextTime := expr.Next(now)
1✔
293
        requeueAfter := nextTime.Sub(now)
1✔
294
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
295
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
296
        return res, err
1✔
297
}
298

299
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
300
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
301
        return err == nil && regSource.ImageStream != nil
1✔
302
}
1✔
303

304
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
305
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
306
        return err == nil && regSource.URL != nil
1✔
307
}
1✔
308

309
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
310
        regSource, err := getCronRegistrySource(cron)
1✔
311
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
312
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
313
}
1✔
314

315
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
316
        if !isCronRegistrySource(cron) {
2✔
317
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
318
        }
1✔
319
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
320
}
321

322
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
323
        source := cron.Spec.Template.Spec.Source
1✔
324
        return source != nil && source.Registry != nil
1✔
325
}
1✔
326

327
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
328
        if !isPvcSource(cron) {
1✔
329
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
330
        }
×
331
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
332
}
333

334
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
335
        source := cron.Spec.Template.Spec.Source
1✔
336
        return source != nil && source.PVC != nil
1✔
337
}
1✔
338

339
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
340
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
341
}
1✔
342

343
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
344
        res := reconcile.Result{}
1✔
345

1✔
346
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
347
        if err != nil {
1✔
348
                return res, err
×
349
        }
×
350

351
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
352
        imports := dataImportCron.Status.CurrentImports
1✔
353
        importSucceeded := false
1✔
354

1✔
355
        dataVolume := dataImportCron.Spec.Template
1✔
356
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
357
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
358
        if err != nil {
1✔
359
                return res, err
×
360
        }
×
361
        if desiredStorageClass != nil {
2✔
362
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
363
                        return res, err
1✔
364
                }
1✔
365
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
366
                desiredSc := desiredStorageClass.Name
1✔
367
                if hasCurrent && currentSc != desiredSc {
2✔
368
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
369
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
370
                                return res, err
×
371
                        }
×
372
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
373
                }
374
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
375
        }
376
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
377
        if err != nil {
1✔
378
                return res, err
×
379
        }
×
380
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
381
        if err != nil {
1✔
382
                return res, err
×
383
        }
×
384

385
        handlePopulatedPvc := func() error {
2✔
386
                if pvc != nil {
2✔
387
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
388
                                return err
×
389
                        }
×
390
                }
391
                importSucceeded = true
1✔
392
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
393
                        return err
×
394
                }
×
395

396
                return nil
1✔
397
        }
398

399
        switch {
1✔
400
        case dv != nil:
1✔
401
                switch dv.Status.Phase {
1✔
402
                case cdiv1.Succeeded:
1✔
403
                        if err := handlePopulatedPvc(); err != nil {
1✔
404
                                return res, err
×
405
                        }
×
406
                case cdiv1.ImportScheduled:
1✔
407
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
408
                case cdiv1.ImportInProgress:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
410
                default:
1✔
411
                        dvPhase := string(dv.Status.Phase)
1✔
412
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
413
                }
414
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
415
                if err := handlePopulatedPvc(); err != nil {
1✔
416
                        return res, err
×
417
                }
×
418
        case snapshot != nil:
1✔
419
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
420
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
421
                                return res, err
×
422
                        }
×
423
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
424
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
425
                }
426
                // Below k8s 1.29 there's no way to know the source volume mode
427
                // Let's at least expose this info on our own snapshots
428
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
429
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
430
                        if err != nil {
1✔
431
                                return res, err
×
432
                        }
×
433
                        if volMode != nil {
2✔
434
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
435
                        }
1✔
436
                }
437
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
438
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
439
                if err != nil {
2✔
440
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
441
                                return res, err
×
442
                        }
×
443
                } else {
1✔
444
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
445
                }
1✔
446
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
447
                        return res, err
×
448
                }
×
449
                importSucceeded = true
1✔
450
        default:
1✔
451
                if len(imports) > 0 {
2✔
452
                        imports = imports[1:]
1✔
453
                        dataImportCron.Status.CurrentImports = imports
1✔
454
                }
1✔
455
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
456
        }
457

458
        if importSucceeded {
2✔
459
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
460
                        return res, err
×
461
                }
×
462
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
463
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
464
                        return res, err
×
465
                }
×
466
        }
467

468
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
469
                return res, err
×
470
        }
×
471

472
        // Skip if schedule is disabled
473
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
474
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
475
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
476
                if err != nil {
2✔
477
                        return pollRes, err
1✔
478
                }
1✔
479
                res = pollRes
1✔
480
        }
481

482
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
483
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
484
        if digestUpdated {
2✔
485
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
486
                if dv != nil {
1✔
487
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
488
                                return res, err
×
489
                        }
×
490
                }
491
                if importSucceeded || len(imports) == 0 {
2✔
492
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
493
                                return res, err
1✔
494
                        }
1✔
495
                }
496
        } else if importSucceeded {
2✔
497
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
498
                        return res, err
×
499
                }
×
500
        } else if len(imports) > 0 {
2✔
501
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
502
        } else {
2✔
503
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
504
        }
1✔
505

506
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
507
                return res, err
×
508
        }
×
509

510
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
511
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
512
                        return res, err
×
513
                }
×
514
        }
515
        return res, nil
1✔
516
}
517

518
// Returns the current import DV if exists, and the last imported PVC
519
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
520
        imports := cron.Status.CurrentImports
1✔
521
        if len(imports) == 0 {
2✔
522
                return nil, nil, nil
1✔
523
        }
1✔
524

525
        dvName := imports[0].DataVolumeName
1✔
526
        dv := &cdiv1.DataVolume{}
1✔
527
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
528
                if !k8serrors.IsNotFound(err) {
1✔
529
                        return nil, nil, err
×
530
                }
×
531
                dv = nil
1✔
532
        }
533

534
        pvc := &corev1.PersistentVolumeClaim{}
1✔
535
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
536
                if !k8serrors.IsNotFound(err) {
1✔
537
                        return nil, nil, err
×
538
                }
×
539
                pvc = nil
1✔
540
        }
541
        return dv, pvc, nil
1✔
542
}
543

544
// Returns the current import DV if exists, and the last imported PVC
545
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
546
        imports := cron.Status.CurrentImports
1✔
547
        if len(imports) == 0 {
2✔
548
                return nil, nil
1✔
549
        }
1✔
550

551
        snapName := imports[0].DataVolumeName
1✔
552
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
553
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
554
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
555
                        return nil, err
×
556
                }
×
557
                return nil, nil
1✔
558
        }
559

560
        return snapshot, nil
1✔
561
}
562

563
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
564
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
565
        dataSource := &cdiv1.DataSource{}
1✔
566
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
567
                return nil, err
1✔
568
        }
1✔
569
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
570
                log := r.log.WithName("getCronManagedDataSource")
×
571
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
572
                return nil, ErrNotManagedByCron
×
573
        }
×
574
        return dataSource, nil
1✔
575
}
576

577
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
578
        objCopy := obj.DeepCopyObject()
1✔
579
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
580
        r.setDataImportCronResourceLabels(cron, obj)
1✔
581
        if !reflect.DeepEqual(obj, objCopy) {
2✔
582
                if err := r.client.Update(ctx, obj); err != nil {
1✔
583
                        return err
×
584
                }
×
585
        }
586
        return nil
1✔
587
}
588

589
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
590
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
591
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
592
                if cond.Status == corev1.ConditionFalse &&
×
593
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
594
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
595
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
596
                        dv.Labels[common.DataImportCronLabel] = ""
×
597
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
598
                                return err
×
599
                        }
×
600
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
601
                                return err
×
602
                        }
×
603
                        cron.Status.CurrentImports = nil
×
604
                }
605
        }
606
        return nil
×
607
}
608

609
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
610
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
611
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
612
        if err != nil {
1✔
613
                return err
×
614
        }
×
615
        if regSource.ImageStream == nil {
1✔
616
                return nil
×
617
        }
×
618
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
619
        if err != nil {
2✔
620
                return err
1✔
621
        }
1✔
622
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
623
        if err != nil {
2✔
624
                return err
1✔
625
        }
1✔
626
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
627
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
628
                log.Info("Updating DataImportCron", "digest", digest)
1✔
629
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
630
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
631
        }
1✔
632
        return nil
1✔
633
}
634

635
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
636
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
637
        podName := getPollerPodName(cron)
1✔
638
        ns := cron.Namespace
1✔
639
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
640
        pod := &corev1.Pod{}
1✔
641

1✔
642
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
643
                digest, err := fetchContainerImageDigest(pod)
1✔
644
                if err != nil || digest == "" {
1✔
645
                        return false, err
×
646
                }
×
647
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
648
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
649
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
650
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
651
                }
1✔
652
                return true, r.client.Delete(ctx, pod)
1✔
653
        } else if cc.IgnoreNotFound(err) != nil {
1✔
654
                return false, err
×
655
        }
×
656

657
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
658
        if err != nil {
1✔
659
                return false, err
×
660
        }
×
661
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
662
        if platform != nil && platform.Architecture != "" {
1✔
663
                if workloadNodePlacement.NodeSelector == nil {
×
664
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
665
                }
×
666
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
667
        }
668

669
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
670

1✔
671
        pod = &corev1.Pod{
1✔
672
                ObjectMeta: metav1.ObjectMeta{
1✔
673
                        Name:      podName,
1✔
674
                        Namespace: ns,
1✔
675
                        OwnerReferences: []metav1.OwnerReference{
1✔
676
                                {
1✔
677
                                        APIVersion:         cron.APIVersion,
1✔
678
                                        Kind:               cron.Kind,
1✔
679
                                        Name:               cron.Name,
1✔
680
                                        UID:                cron.UID,
1✔
681
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
682
                                        Controller:         ptr.To[bool](true),
1✔
683
                                },
1✔
684
                        },
1✔
685
                },
1✔
686
                Spec: corev1.PodSpec{
1✔
687
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
688
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
689
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
690
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
691
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
692
                        Volumes: []corev1.Volume{
1✔
693
                                {
1✔
694
                                        Name: "shared-volume",
1✔
695
                                        VolumeSource: corev1.VolumeSource{
1✔
696
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
697
                                        },
1✔
698
                                },
1✔
699
                        },
1✔
700
                        InitContainers: []corev1.Container{
1✔
701
                                {
1✔
702
                                        Name:                     "init",
1✔
703
                                        Image:                    r.image,
1✔
704
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
705
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
706
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
707
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
708
                                },
1✔
709
                        },
1✔
710
                        Containers: []corev1.Container{
1✔
711
                                {
1✔
712
                                        Name:                     "image-container",
1✔
713
                                        Image:                    containerImage,
1✔
714
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
715
                                        Command:                  []string{"/shared/server", "-h"},
1✔
716
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
717
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
718
                                },
1✔
719
                        },
1✔
720
                },
1✔
721
        }
1✔
722

1✔
723
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
724
        if pod.Spec.SecurityContext != nil {
2✔
725
                pod.Spec.SecurityContext.FSGroup = nil
1✔
726
        }
1✔
727

728
        return false, r.client.Create(ctx, pod)
1✔
729
}
730

731
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
732
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
733
                return "", nil
×
734
        }
×
735

736
        status := pod.Status.ContainerStatuses[0]
1✔
737
        if status.State.Waiting != nil {
1✔
738
                reason := status.State.Waiting.Reason
×
739
                switch reason {
×
740
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
741
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
742
                }
743
                return "", nil
×
744
        }
745

746
        if status.State.Terminated == nil {
1✔
747
                return "", nil
×
748
        }
×
749

750
        imageID := status.ImageID
1✔
751
        if imageID == "" {
1✔
752
                return "", errors.Errorf("Container has no imageID")
×
753
        }
×
754
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
755
        if idx < 0 {
1✔
756
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
757
        }
×
758

759
        return imageID[idx:], nil
1✔
760
}
761

762
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
763
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
764
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
765
        if err != nil {
1✔
766
                return err
×
767
        }
×
768
        ns := pvcSource.Namespace
1✔
769
        if ns == "" {
2✔
770
                ns = dataImportCron.Namespace
1✔
771
        }
1✔
772
        pvc := &corev1.PersistentVolumeClaim{}
1✔
773
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
774
                return err
1✔
775
        }
1✔
776
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
777
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
778
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
779
                log.Info("Updating DataImportCron", "digest", digest)
1✔
780
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
781
        }
1✔
782
        return nil
1✔
783
}
784

785
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
786
        log := r.log.WithName("updateDataSource")
1✔
787
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
788
        if err != nil {
2✔
789
                if k8serrors.IsNotFound(err) {
2✔
790
                        dataSource = r.newDataSource(dataImportCron)
1✔
791
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
792
                                return err
×
793
                        }
×
794
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
795
                } else if errors.Is(err, ErrNotManagedByCron) {
×
796
                        return nil
×
797
                } else {
×
798
                        return err
×
799
                }
×
800
        }
801
        dataSourceCopy := dataSource.DeepCopy()
1✔
802
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
803

1✔
804
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
805
        populateDataSource(format, dataSource, sourcePVC)
1✔
806

1✔
807
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
808
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
809
                        return err
×
810
                }
×
811
        }
812

813
        return nil
1✔
814
}
815

816
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
817
        if sourcePVC == nil {
2✔
818
                return
1✔
819
        }
1✔
820

821
        switch format {
1✔
822
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
823
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
824
                        PVC: sourcePVC,
1✔
825
                }
1✔
826
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
827
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
828
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
829
                                Namespace: sourcePVC.Namespace,
1✔
830
                                Name:      sourcePVC.Name,
1✔
831
                        },
1✔
832
                }
1✔
833
        }
834
}
835

836
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
837
        if dataImportCron.Status.CurrentImports == nil {
1✔
838
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
839
        }
×
840
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
841
                Namespace: dataImportCron.Namespace,
1✔
842
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
843
        }
1✔
844
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
845
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
846
                now := metav1.Now()
1✔
847
                dataImportCron.Status.LastImportTimestamp = &now
1✔
848
        }
1✔
849
        return nil
1✔
850
}
851

852
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
853
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
854
        if lastTimeStr == "" {
2✔
855
                return nil
1✔
856
        }
1✔
857
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
858
        if err != nil {
1✔
859
                return err
×
860
        }
×
861
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
862
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
863
        }
1✔
864
        return nil
1✔
865
}
866

867
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
868
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
869
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
870
        if digest == "" {
1✔
871
                return nil
×
872
        }
×
873
        dvName, err := createDvName(dataSourceName, digest)
1✔
874
        if err != nil {
2✔
875
                return err
1✔
876
        }
1✔
877

878
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
879
        for _, src := range sources {
2✔
880
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
881
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
882
                                return err
×
883
                        }
×
884
                } else {
1✔
885
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
886
                                return err
×
887
                        }
×
888
                        // If source exists don't create DV
889
                        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
890
                        return nil
1✔
891
                }
892
        }
893

894
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
895
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
896
                return err
×
897
        } else if !allowed {
2✔
898
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
899
                        "Not authorized to create DataVolume", notAuthorized)
1✔
900
                return nil
1✔
901
        }
1✔
902
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
903
                return err
×
904
        }
×
905
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
906

1✔
907
        return nil
1✔
908
}
909

910
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
911
        if !isPvcSource(dataImportCron) {
2✔
912
                return true, nil
1✔
913
        }
1✔
914
        saName := "default"
1✔
915
        if dataImportCron.Spec.ServiceAccountName != nil {
1✔
NEW
916
                saName = *dataImportCron.Spec.ServiceAccountName
×
UNCOV
917
        }
×
918
        if resp, err := dv.AuthorizeSA(dv.Namespace, dv.Name, &authProxy{r.client}, dataImportCron.Namespace, saName); err != nil {
1✔
919
                return false, err
×
920
        } else if !resp.Allowed {
2✔
921
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
1✔
922
                return false, nil
1✔
923
        }
1✔
924

925
        return true, nil
1✔
926
}
927

928
type authProxy struct {
929
        client client.Client
930
}
931

932
func (p *authProxy) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
1✔
933
        if err := p.client.Create(context.TODO(), sar); err != nil {
1✔
934
                return nil, err
×
935
        }
×
936
        return sar, nil
1✔
937
}
938

939
func (p *authProxy) GetNamespace(name string) (*corev1.Namespace, error) {
1✔
940
        ns := &corev1.Namespace{}
1✔
941
        if err := p.client.Get(context.TODO(), types.NamespacedName{Name: name}, ns); err != nil {
1✔
942
                return nil, err
×
943
        }
×
944
        return ns, nil
1✔
945
}
946

NEW
947
func (p *authProxy) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
948
        das := &cdiv1.DataSource{}
×
NEW
949
        if err := p.client.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, das); err != nil {
×
950
                return nil, err
×
951
        }
×
952
        return das, nil
×
953
}
954

955
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
956
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
957
        if !ok {
1✔
958
                // nothing to delete
×
959
                return nil
×
960
        }
×
961
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
962
        if err != nil {
1✔
963
                return err
×
964
        }
×
965
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
966
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
967
        for _, src := range sources {
2✔
968
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
969
                        return err
×
970
                }
×
971
        }
972
        for _, src := range sources {
2✔
973
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
974
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
975
                }
×
976
        }
977
        // Only update desired storage class once garbage collection went through
978
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
979
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
980
        if err != nil {
1✔
981
                return err
×
982
        }
×
983

984
        return nil
1✔
985
}
986

987
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
988
        switch format {
1✔
989
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
990
                return nil
1✔
991
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
992
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
993
        default:
×
994
                return fmt.Errorf("unknown source format for snapshot")
×
995
        }
996
}
997

998
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
999
        if pvc == nil {
1✔
1000
                return nil
×
1001
        }
×
1002
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1003
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1004
                return nil
1✔
1005
        }
1✔
1006
        storageProfile := &cdiv1.StorageProfile{}
1✔
1007
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1008
                return err
×
1009
        }
×
1010
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1011
        if err != nil {
1✔
1012
                return err
×
1013
        }
×
1014
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1015
                ObjectMeta: metav1.ObjectMeta{
1✔
1016
                        Name:      pvc.Name,
1✔
1017
                        Namespace: dataImportCron.Namespace,
1✔
1018
                        Labels: map[string]string{
1✔
1019
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1020
                                common.CDIComponentLabel: "",
1✔
1021
                        },
1✔
1022
                },
1✔
1023
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1024
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1025
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1026
                        },
1✔
1027
                        VolumeSnapshotClassName: &className,
1✔
1028
                },
1✔
1029
        }
1✔
1030
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1031
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1032

1✔
1033
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1034
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1035
                if !k8serrors.IsNotFound(err) {
1✔
1036
                        return err
×
1037
                }
×
1038
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1039
                if pvc.Spec.VolumeMode != nil {
2✔
1040
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1041
                }
1✔
1042
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
1043
                        return err
×
1044
                }
×
1045
        } else {
1✔
1046
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1047
                        // Clean up DV/PVC as they are not needed anymore
1✔
1048
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1049
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
1050
                                return err
×
1051
                        }
×
1052
                }
1053
        }
1054

1055
        return nil
1✔
1056
}
1057

1058
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1059
        dataImportCron.Status.SourceFormat = &format
1✔
1060

1✔
1061
        switch format {
1✔
1062
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1063
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1064
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1065
                if snapshot == nil {
2✔
1066
                        // Snapshot create/update will trigger reconcile
1✔
1067
                        return nil
1✔
1068
                }
1✔
1069
                if cc.IsSnapshotReady(snapshot) {
2✔
1070
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1071
                } else {
2✔
1072
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1073
                }
1✔
1074
        default:
×
1075
                return fmt.Errorf("unknown source format for snapshot")
×
1076
        }
1077

1078
        return nil
1✔
1079
}
1080

1081
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1082
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1083
        if desiredStorageClass == nil {
2✔
1084
                return format, nil
1✔
1085
        }
1✔
1086

1087
        storageProfile := &cdiv1.StorageProfile{}
1✔
1088
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1089
                return format, err
×
1090
        }
×
1091
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1092
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1093
        }
1✔
1094

1095
        return format, nil
1✔
1096
}
1097

1098
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1099
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
1100
                return nil
×
1101
        }
×
1102
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1103
        if err != nil {
1✔
1104
                return err
×
1105
        }
×
1106

1107
        maxImports := defaultImportsToKeepPerCron
1✔
1108

1✔
1109
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1110
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1111
        }
1✔
1112

1113
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
1114
                return err
×
1115
        }
×
1116
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
1117
                return err
×
1118
        }
×
1119

1120
        return nil
1✔
1121
}
1122

1123
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1124
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1125

1✔
1126
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1127
                return err
×
1128
        }
×
1129
        if len(pvcList.Items) > maxImports {
2✔
1130
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1131
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1132
                })
1✔
1133
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1134
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1135
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1136
                                return err
×
1137
                        }
×
1138
                }
1139
        }
1140

1141
        dvList := &cdiv1.DataVolumeList{}
1✔
1142
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1143
                return err
×
1144
        }
×
1145

1146
        if len(dvList.Items) > maxImports {
2✔
1147
                for _, dv := range dvList.Items {
2✔
1148
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1149
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
1150
                                return err
×
1151
                        }
×
1152

1153
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1154
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1155
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
1156
                                        return err
×
1157
                                }
×
1158
                        }
1159
                }
1160
        }
1161

1162
        return nil
1✔
1163
}
1164

1165
// deleteDvPvc deletes DV or PVC if DV was GCed
1166
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1167
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1168
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1169
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1170
                return err
1✔
1171
        }
1✔
1172
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1173
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
1174
                return err
×
1175
        }
×
1176
        return nil
1✔
1177
}
1178

1179
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1180
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1181

1✔
1182
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1183
                if meta.IsNoMatchError(err) {
×
1184
                        return nil
×
1185
                }
×
1186
                return err
×
1187
        }
1188
        if len(snapList.Items) > maxImports {
1✔
1189
                sort.Slice(snapList.Items, func(i, j int) bool {
×
1190
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1191
                })
×
1192
                for _, snap := range snapList.Items[maxImports:] {
×
1193
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1194
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
1195
                                return err
×
1196
                        }
×
1197
                }
1198
        }
1199

1200
        return nil
1✔
1201
}
1202

1203
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1204
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1205
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1206

1✔
1207
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
1208
                return err
×
1209
        }
×
1210
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1211
        if err != nil {
1✔
1212
                return err
×
1213
        }
×
1214
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1215
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1216
                return err
×
1217
        }
×
1218
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
1219
                return err
×
1220
        }
×
1221
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
1222
                return err
×
1223
        }
×
1224
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1225
                return err
×
1226
        }
×
1227
        return nil
1✔
1228
}
1229

1230
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1231
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1232
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1233
        if err != nil {
1✔
1234
                return err
×
1235
        }
×
1236
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1237
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
1238
                return err
×
1239
        }
×
1240
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
1241
                return err
×
1242
        }
×
1243

1244
        return nil
1✔
1245
}
1246

1247
// NewDataImportCronController creates a new instance of the DataImportCron controller
1248
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1249
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1250
                Scheme: mgr.GetScheme(),
×
1251
                Mapper: mgr.GetRESTMapper(),
×
1252
        })
×
1253
        if err != nil {
×
1254
                return nil, err
×
1255
        }
×
1256
        reconciler := &DataImportCronReconciler{
×
1257
                client:          mgr.GetClient(),
×
1258
                uncachedClient:  uncachedClient,
×
1259
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1260
                scheme:          mgr.GetScheme(),
×
1261
                log:             log.WithName(dataImportControllerName),
×
1262
                image:           importerImage,
×
1263
                pullPolicy:      pullPolicy,
×
1264
                cdiNamespace:    util.GetNamespace(),
×
1265
                installerLabels: installerLabels,
×
1266
        }
×
1267
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1268
                MaxConcurrentReconciles: 3,
×
1269
                Reconciler:              reconciler,
×
1270
        })
×
1271
        if err != nil {
×
1272
                return nil, err
×
1273
        }
×
1274
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1275
                return nil, err
×
1276
        }
×
1277
        log.Info("Initialized DataImportCron controller")
×
1278
        return dataImportCronController, nil
×
1279
}
1280

1281
func getCronName(obj client.Object) string {
×
1282
        return obj.GetLabels()[common.DataImportCronLabel]
×
1283
}
×
1284

1285
func getCronNs(obj client.Object) string {
×
1286
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1287
}
×
1288

1289
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1290
        if cronName := getCronName(obj); cronName != "" {
×
1291
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1292
        }
×
1293
        return nil
×
1294
}
1295

1296
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1297
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1298
                return err
×
1299
        }
×
1300

1301
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1302
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1303
                // Otherwise we risk losing the storage profile event
×
1304
                var crons cdiv1.DataImportCronList
×
1305
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1306
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1307
                        return nil
×
1308
                }
×
1309
                // Storage profiles are 1:1 to storage classes
1310
                scName := obj.GetName()
×
1311
                var reqs []reconcile.Request
×
1312
                for _, cron := range crons.Items {
×
1313
                        dataVolume := cron.Spec.Template
×
1314
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1315
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1316
                        if err != nil || templateSc == nil {
×
1317
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1318
                                return reqs
×
1319
                        }
×
1320
                        if templateSc.Name == scName {
×
1321
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1322
                        }
×
1323
                }
1324
                return reqs
×
1325
        }
1326

1327
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1328
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1329
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1330
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1331
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1332
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1333
                },
1334
        )); err != nil {
×
1335
                return err
×
1336
        }
×
1337

1338
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1339
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1340
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1341
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1342
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1343
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1344
                },
1345
        )); err != nil {
×
1346
                return err
×
1347
        }
×
1348

1349
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1350
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1351
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1352
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1353
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1354
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1355
                },
1356
        )); err != nil {
×
1357
                return err
×
1358
        }
×
1359

1360
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1361
                return err
×
1362
        }
×
1363

1364
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1365
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1366
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1367
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1368
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1369
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1370
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1371
                        },
×
1372
                },
1373
        )); err != nil {
×
1374
                return err
×
1375
        }
×
1376

1377
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1378
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1379
        }
×
1380

1381
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1382
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1383
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1384
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1385
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1386
                        },
×
1387
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1388
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1389
                },
1390
        )); err != nil {
×
1391
                return err
×
1392
        }
×
1393

1394
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1395
                if meta.IsNoMatchError(err) {
×
1396
                        // Back out if there's no point to attempt watch
×
1397
                        return nil
×
1398
                }
×
1399
                if !cc.IsErrCacheNotStarted(err) {
×
1400
                        return err
×
1401
                }
×
1402
        }
1403
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1404
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1405
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1406
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1407
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1408
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1409
                },
1410
        )); err != nil {
×
1411
                return err
×
1412
        }
×
1413

1414
        return nil
×
1415
}
1416

1417
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1418
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1419
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1420
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1421
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1422
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1423
                                log.Info("Update", "sc", obj.GetName(),
×
1424
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1425
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1426
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1427
                                if err != nil {
×
1428
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1429
                                }
×
1430
                                return reqs
×
1431
                        },
1432
                ),
1433
                predicate.TypedFuncs[*storagev1.StorageClass]{
1434
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1435
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1436
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1437
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1438
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1439
                        },
×
1440
                },
1441
        )); err != nil {
×
1442
                return err
×
1443
        }
×
1444

1445
        return nil
×
1446
}
1447

1448
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1449
        dicList := &cdiv1.DataImportCronList{}
×
1450
        if err := c.List(ctx, dicList); err != nil {
×
1451
                return nil, err
×
1452
        }
×
1453
        reqs := []reconcile.Request{}
×
1454
        for _, dic := range dicList.Items {
×
1455
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1456
                        continue
×
1457
                }
1458

1459
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1460
        }
1461

1462
        return reqs, nil
×
1463
}
1464

1465
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1466
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1467
                return false, nil
1✔
1468
        }
1✔
1469

1470
        sc := pvc.Spec.StorageClassName
1✔
1471
        if sc == nil || *sc == desiredStorageClass {
2✔
1472
                return false, nil
1✔
1473
        }
1✔
1474

1475
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1476
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1477
                return false, err
×
1478
        }
×
1479

1480
        return true, nil
1✔
1481
}
1482

1483
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1484
        cronJob := &batchv1.CronJob{}
1✔
1485
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1486
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1487
                return false, cc.IgnoreNotFound(err)
1✔
1488
        }
1✔
1489

1490
        cronJobCopy := cronJob.DeepCopy()
1✔
1491
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1492
                return false, err
×
1493
        }
×
1494

1495
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1496
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1497
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1498
                        return false, cc.IgnoreNotFound(err)
×
1499
                }
×
1500
        }
1501
        return true, nil
1✔
1502
}
1503

1504
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1505
        cronJob := &batchv1.CronJob{
1✔
1506
                ObjectMeta: metav1.ObjectMeta{
1✔
1507
                        Name:      GetCronJobName(cron),
1✔
1508
                        Namespace: r.cdiNamespace,
1✔
1509
                },
1✔
1510
        }
1✔
1511
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1512
                return nil, err
×
1513
        }
×
1514
        return cronJob, nil
1✔
1515
}
1516

1517
// InitPollerPod inits poller Pod
1518
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1519
        regSource, err := getCronRegistrySource(cron)
1✔
1520
        if err != nil {
1✔
1521
                return err
×
1522
        }
×
1523
        if regSource.URL == nil {
1✔
1524
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1525
        }
×
1526
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1527
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1528
                return err
×
1529
        }
×
1530
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1531
        if err != nil {
1✔
1532
                return err
×
1533
        }
×
1534
        container := corev1.Container{
1✔
1535
                Name:  "cdi-source-update-poller",
1✔
1536
                Image: image,
1✔
1537
                Command: []string{
1✔
1538
                        "/usr/bin/cdi-source-update-poller",
1✔
1539
                        "-ns", cron.Namespace,
1✔
1540
                        "-cron", cron.Name,
1✔
1541
                        "-url", *regSource.URL,
1✔
1542
                },
1✔
1543
                ImagePullPolicy:          pullPolicy,
1✔
1544
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1545
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1546
        }
1✔
1547

1✔
1548
        var volumes []corev1.Volume
1✔
1549
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1550
        if hasCertConfigMap {
1✔
1551
                vm := corev1.VolumeMount{
×
1552
                        Name:      CertVolName,
×
1553
                        MountPath: common.ImporterCertDir,
×
1554
                }
×
1555
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1556
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1557
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1558
        }
×
1559

1560
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1561
                vm := corev1.VolumeMount{
1✔
1562
                        Name:      ProxyCertVolName,
1✔
1563
                        MountPath: common.ImporterProxyCertDir,
1✔
1564
                }
1✔
1565
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1566
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1567
        }
1✔
1568

1569
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1570
                container.Env = append(container.Env,
×
1571
                        corev1.EnvVar{
×
1572
                                Name: common.ImporterAccessKeyID,
×
1573
                                ValueFrom: &corev1.EnvVarSource{
×
1574
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1575
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1576
                                                        Name: *regSource.SecretRef,
×
1577
                                                },
×
1578
                                                Key: common.KeyAccess,
×
1579
                                        },
×
1580
                                },
×
1581
                        },
×
1582
                        corev1.EnvVar{
×
1583
                                Name: common.ImporterSecretKey,
×
1584
                                ValueFrom: &corev1.EnvVarSource{
×
1585
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1586
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1587
                                                        Name: *regSource.SecretRef,
×
1588
                                                },
×
1589
                                                Key: common.KeySecret,
×
1590
                                        },
×
1591
                                },
×
1592
                        },
×
1593
                )
×
1594
        }
×
1595

1596
        addEnvVar := func(varName, value string) {
2✔
1597
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1598
        }
1✔
1599

1600
        if insecureTLS {
1✔
1601
                addEnvVar(common.InsecureTLSVar, "true")
×
1602
        }
×
1603

1604
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1605
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1606
                        addEnvVar(varName, value)
1✔
1607
                }
1✔
1608
        }
1609

1610
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1611
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1612
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1613

1✔
1614
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1615
        if err != nil {
1✔
1616
                return err
×
1617
        }
×
1618
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1619
        if err != nil {
1✔
1620
                return err
×
1621
        }
×
1622

1623
        podSpec := &pod.Spec
1✔
1624

1✔
1625
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1626
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1627
        podSpec.Containers = []corev1.Container{container}
1✔
1628
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1629
        podSpec.Volumes = volumes
1✔
1630
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1631
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1632
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1633
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1634

1✔
1635
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1636
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1637
        if podSpec.SecurityContext != nil {
2✔
1638
                podSpec.SecurityContext.FSGroup = nil
1✔
1639
        }
1✔
1640
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1641
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1642
        }
1✔
1643

1644
        if pod.Labels == nil {
2✔
1645
                pod.Labels = map[string]string{}
1✔
1646
        }
1✔
1647
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1648

1✔
1649
        return nil
1✔
1650
}
1651

1652
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1653
        cronJobSpec := &cronJob.Spec
1✔
1654
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1655
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1656
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1657
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1658

1✔
1659
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1660
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1661
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1662
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1663

1✔
1664
        pod := &jobSpec.Template
1✔
1665
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1666
                return err
×
1667
        }
×
1668
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1669
                return err
×
1670
        }
×
1671
        return nil
1✔
1672
}
1673

1674
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1675
        job := &batchv1.Job{
1✔
1676
                ObjectMeta: metav1.ObjectMeta{
1✔
1677
                        Name:      GetInitialJobName(cron),
1✔
1678
                        Namespace: cronJob.Namespace,
1✔
1679
                },
1✔
1680
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1681
        }
1✔
1682
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1683
                return nil, err
×
1684
        }
×
1685
        return job, nil
1✔
1686
}
1687

1688
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1689
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1690
                return err
×
1691
        }
×
1692
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1693
        labels := obj.GetLabels()
1✔
1694
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1695
        labels[common.DataImportCronLabel] = cron.Name
1✔
1696
        obj.SetLabels(labels)
1✔
1697
        return nil
1✔
1698
}
1699

1700
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1701
        dv := cron.Spec.Template.DeepCopy()
1✔
1702
        if isCronRegistrySource(cron) {
2✔
1703
                var digestedURL string
1✔
1704
                if isURLSource(cron) {
2✔
1705
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1706
                } else if isImageStreamSource(cron) {
3✔
1707
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1708
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1709
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1710
                }
1✔
1711
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1712
        }
1713
        dv.Name = dataVolumeName
1✔
1714
        dv.Namespace = cron.Namespace
1✔
1715
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1716
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1717
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1718
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1719

1✔
1720
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1721
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1722
        }
1✔
1723

1724
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1725

1✔
1726
        return dv
1✔
1727
}
1728

1729
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1730
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1731
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1732
        labels := obj.GetLabels()
1✔
1733
        labels[common.DataImportCronLabel] = cron.Name
1✔
1734
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1735
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1736
        }
1✔
1737
        obj.SetLabels(labels)
1✔
1738
}
1739

1740
func untagDigestedDockerURL(dockerURL string) string {
1✔
1741
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1742
                url := u.Host + u.Path
1✔
1743
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1744
                // Check for tag
1✔
1745
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1746
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1747
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1748
                        }
1✔
1749
                }
1750
        }
1751
        return dockerURL
1✔
1752
}
1753

1754
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1755
        if val := cron.Labels[ann]; val != "" {
2✔
1756
                cc.AddLabel(dv, ann, val)
1✔
1757
        }
1✔
1758
}
1759

1760
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1761
        if val := cron.Annotations[ann]; val != "" {
1✔
1762
                cc.AddAnnotation(dv, ann, val)
×
1763
        }
×
1764
}
1765

1766
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1767
        dataSource := &cdiv1.DataSource{
1✔
1768
                ObjectMeta: metav1.ObjectMeta{
1✔
1769
                        Name:      cron.Spec.ManagedDataSource,
1✔
1770
                        Namespace: cron.Namespace,
1✔
1771
                },
1✔
1772
        }
1✔
1773
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1774
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1775
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1776
        return dataSource
1✔
1777
}
1✔
1778

1779
// Create DataVolume name based on the DataSource name + prefix of the digest
1780
func createDvName(prefix, digest string) (string, error) {
1✔
1781
        digestPrefix := ""
1✔
1782
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1783
                digestPrefix = digestSha256Prefix
1✔
1784
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1785
                digestPrefix = digestUIDPrefix
1✔
1786
        } else {
2✔
1787
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1788
        }
1✔
1789
        fromIdx := len(digestPrefix)
1✔
1790
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1791
        if len(digest) < toIdx {
2✔
1792
                return "", errors.Errorf("Digest is too short")
1✔
1793
        }
1✔
1794
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1795
}
1796

1797
// GetCronJobName get CronJob name based on cron name and UID
1798
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1799
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1800
}
1✔
1801

1802
// GetInitialJobName get initial job name based on cron name and UID
1803
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1804
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1805
}
1✔
1806

1807
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1808
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1809
}
1✔
1810

1811
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1812
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1813
}
1✔
1814

1815
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1816
        dv := &cron.Spec.Template
1✔
1817

1✔
1818
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1819
                return explicitVolumeMode, nil
×
1820
        }
×
1821

1822
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1823
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1824
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1825
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1826
                        AccessModes:      accessModes,
1✔
1827
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1828
                        Resources: corev1.VolumeResourceRequirements{
1✔
1829
                                Requests: corev1.ResourceList{
1✔
1830
                                        // Doesn't matter
1✔
1831
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1832
                                },
1✔
1833
                        },
1✔
1834
                },
1✔
1835
        }
1✔
1836
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1837
                return nil, err
×
1838
        }
×
1839

1840
        return inferredPvc.Spec.VolumeMode, nil
1✔
1841
}
1842

1843
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1844
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1845
        if dv.Spec.PVC != nil {
1✔
1846
                return dv.Spec.PVC.VolumeMode
×
1847
        }
×
1848

1849
        if dv.Spec.Storage != nil {
2✔
1850
                return dv.Spec.Storage.VolumeMode
1✔
1851
        }
1✔
1852

1853
        return nil
×
1854
}
1855

1856
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1857
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1858
        if dv.Spec.PVC != nil {
1✔
1859
                return dv.Spec.PVC.AccessModes
×
1860
        }
×
1861

1862
        if dv.Spec.Storage != nil {
2✔
1863
                return dv.Spec.Storage.AccessModes
1✔
1864
        }
1✔
1865

1866
        return nil
×
1867
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc