• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5660

09 Nov 2025 03:37PM UTC coverage: 58.832% (+0.08%) from 58.751%
#5660

Pull #3946

travis-ci

arnongilboa
Authorize DataImportCron PVC clone based on creator UserInfo

Add the DataImportCron spec a CreatedBy field with JSON-marshaled
UserInfo of the user who created the DataImportCron. The field is set by
the mutating webhook and cannot be set by users.

In case of DataImportCron with PVC source, the controller checks the
creator ServiceAccount/User is authorized to clone the source PVC.

Assisted by Cursor AI.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
Pull Request #3946: Authorize DataImportCron PVC clone based on creator UserInfo

126 of 189 new or added lines in 8 files covered. (66.67%)

146 existing lines in 1 file now uncovered.

17399 of 29574 relevant lines covered (58.83%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.13
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "encoding/json"
22
        "fmt"
23
        "net/url"
24
        "reflect"
25
        "sort"
26
        "strings"
27
        "time"
28

29
        "github.com/containers/image/v5/docker/reference"
30
        "github.com/go-logr/logr"
31
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
32
        imagev1 "github.com/openshift/api/image/v1"
33
        secv1 "github.com/openshift/api/security/v1"
34
        "github.com/pkg/errors"
35
        cronexpr "github.com/robfig/cron/v3"
36

37
        authenticationv1 "k8s.io/api/authentication/v1"
38
        authorizationv1 "k8s.io/api/authorization/v1"
39
        batchv1 "k8s.io/api/batch/v1"
40
        corev1 "k8s.io/api/core/v1"
41
        storagev1 "k8s.io/api/storage/v1"
42
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
43
        "k8s.io/apimachinery/pkg/api/meta"
44
        "k8s.io/apimachinery/pkg/api/resource"
45
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
46
        "k8s.io/apimachinery/pkg/labels"
47
        "k8s.io/apimachinery/pkg/runtime"
48
        "k8s.io/apimachinery/pkg/types"
49
        "k8s.io/client-go/tools/record"
50
        openapicommon "k8s.io/kube-openapi/pkg/common"
51
        "k8s.io/utils/ptr"
52

53
        "sigs.k8s.io/controller-runtime/pkg/client"
54
        "sigs.k8s.io/controller-runtime/pkg/controller"
55
        "sigs.k8s.io/controller-runtime/pkg/event"
56
        "sigs.k8s.io/controller-runtime/pkg/handler"
57
        "sigs.k8s.io/controller-runtime/pkg/manager"
58
        "sigs.k8s.io/controller-runtime/pkg/predicate"
59
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
60
        "sigs.k8s.io/controller-runtime/pkg/source"
61

62
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
63
        "kubevirt.io/containerized-data-importer/pkg/common"
64
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
65
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
66
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
67
        "kubevirt.io/containerized-data-importer/pkg/operator"
68
        "kubevirt.io/containerized-data-importer/pkg/util"
69
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
70
)
71

72
const (
73
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
74
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
75
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
76
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
77
)
78

79
// DataImportCronReconciler members
80
type DataImportCronReconciler struct {
81
        client          client.Client
82
        uncachedClient  client.Client
83
        recorder        record.EventRecorder
84
        scheme          *runtime.Scheme
85
        log             logr.Logger
86
        image           string
87
        pullPolicy      string
88
        cdiNamespace    string
89
        installerLabels map[string]string
90
}
91

92
const (
93
        // AnnSourceDesiredDigest is the digest of the pending updated image
94
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
95
        // AnnImageStreamDockerRef is the ImageStream Docker reference
96
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
97
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
98
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
99
        // AnnLastCronTime is the cron last execution time stamp
100
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
101
        // AnnLastUseTime is the PVC last use time stamp
102
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
103
        // AnnStorageClass is the cron DV's storage class
104
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
105

106
        dataImportControllerName    = "dataimportcron-controller"
107
        digestSha256Prefix          = "sha256:"
108
        digestUIDPrefix             = "uid:"
109
        digestDvNameSuffixLength    = 12
110
        cronJobUIDSuffixLength      = 8
111
        defaultImportsToKeepPerCron = 3
112
)
113

114
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
115

116
// Reconcile loop for DataImportCronReconciler
117
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
118
        dataImportCron := &cdiv1.DataImportCron{}
1✔
119
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
120
                return reconcile.Result{}, err
1✔
121
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
122
                err := r.cleanup(ctx, req.NamespacedName)
1✔
123
                return reconcile.Result{}, err
1✔
124
        }
1✔
125
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
126
        if !shouldReconcile || err != nil {
1✔
127
                return reconcile.Result{}, err
×
128
        }
×
129

130
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
131
                return reconcile.Result{}, err
×
132
        }
×
133

134
        return r.update(ctx, dataImportCron)
1✔
135
}
136

137
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
138
        dataSource := &cdiv1.DataSource{}
1✔
139
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
140
                if k8serrors.IsNotFound(err) {
2✔
141
                        return true, nil
1✔
142
                }
1✔
143
                return false, err
×
144
        }
145
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
146
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
147
                return true, nil
1✔
148
        }
1✔
149
        otherCron := &cdiv1.DataImportCron{}
1✔
150
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
151
                if k8serrors.IsNotFound(err) {
2✔
152
                        return true, nil
1✔
153
                }
1✔
154
                return false, err
×
155
        }
156
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
157
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
158
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
159
                r.log.V(3).Info(msg)
1✔
160
                return false, nil
1✔
161
        }
1✔
162
        return true, nil
×
163
}
164

165
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
166
        if dataImportCron.Spec.Schedule == "" {
2✔
167
                return nil
1✔
168
        }
1✔
169
        if isControllerPolledSource(dataImportCron) {
2✔
170
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
171
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
172
                }
1✔
173
                return nil
1✔
174
        }
175
        if !isURLSource(dataImportCron) {
1✔
176
                return nil
×
177
        }
×
178
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
179
        if exists || err != nil {
2✔
180
                return err
1✔
181
        }
1✔
182
        cronJob, err := r.newCronJob(dataImportCron)
1✔
183
        if err != nil {
1✔
184
                return err
×
185
        }
×
186
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
187
                return err
×
188
        }
×
189
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
190
        if err != nil {
1✔
191
                return err
×
192
        }
×
193
        if err := r.client.Create(ctx, job); err != nil {
1✔
194
                return err
×
195
        }
×
196
        return nil
1✔
197
}
198

199
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
200
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
201
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
202
        }
×
203
        imageStream := &imagev1.ImageStream{}
1✔
204
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
205
        if err != nil {
2✔
206
                return nil, "", err
1✔
207
        }
1✔
208
        imageStreamNamespacedName := types.NamespacedName{
1✔
209
                Namespace: imageStreamNamespace,
1✔
210
                Name:      name,
1✔
211
        }
1✔
212
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
213
                return nil, "", err
1✔
214
        }
1✔
215
        return imageStream, tag, nil
1✔
216
}
217

218
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
219
        if imageStream == nil {
1✔
220
                return "", "", errors.Errorf("No ImageStream")
×
221
        }
×
222
        tags := imageStream.Status.Tags
1✔
223
        if len(tags) == 0 {
1✔
224
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
225
        }
×
226

227
        tagIdx := 0
1✔
228
        if len(imageStreamTag) > 0 {
2✔
229
                tagIdx = -1
1✔
230
                for i, tag := range tags {
2✔
231
                        if tag.Tag == imageStreamTag {
2✔
232
                                tagIdx = i
1✔
233
                                break
1✔
234
                        }
235
                }
236
        }
237
        if tagIdx == -1 {
2✔
238
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
239
        }
1✔
240

241
        if len(tags[tagIdx].Items) == 0 {
2✔
242
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
243
        }
1✔
244
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
245
}
246

247
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
248
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
249
                return imageStreamName, "", nil
1✔
250
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
251
                return subs[0], subs[1], nil
1✔
252
        }
1✔
253
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
254
}
255

256
func (r *DataImportCronReconciler) pollSourceDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
257
        nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]
1✔
258
        if nextTimeStr == "" {
1✔
259
                return r.setNextCronTime(dataImportCron)
×
260
        }
×
261
        nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
262
        if err != nil {
1✔
263
                return reconcile.Result{}, err
×
264
        }
×
265
        if nextTime.After(time.Now()) {
2✔
266
                return r.setNextCronTime(dataImportCron)
1✔
267
        }
1✔
268
        switch {
1✔
269
        case isImageStreamSource(dataImportCron):
1✔
270
                if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
271
                        return reconcile.Result{}, err
1✔
272
                }
1✔
273
        case isPvcSource(dataImportCron):
1✔
274
                if err := r.updatePvcDesiredDigest(ctx, dataImportCron); err != nil {
2✔
275
                        return reconcile.Result{}, err
1✔
276
                }
1✔
277
        case isNodePull(dataImportCron):
1✔
278
                if done, err := r.updateContainerImageDesiredDigest(ctx, dataImportCron); !done {
2✔
279
                        return reconcile.Result{RequeueAfter: 3 * time.Second}, err
1✔
280
                } else if err != nil {
2✔
281
                        return reconcile.Result{}, err
×
282
                }
×
283
        }
284

285
        return r.setNextCronTime(dataImportCron)
1✔
286
}
287

288
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
289
        now := time.Now()
1✔
290
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
291
        if err != nil {
1✔
292
                return reconcile.Result{}, err
×
293
        }
×
294
        nextTime := expr.Next(now)
1✔
295
        requeueAfter := nextTime.Sub(now)
1✔
296
        res := reconcile.Result{RequeueAfter: requeueAfter}
1✔
297
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
298
        return res, err
1✔
299
}
300

301
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
302
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
303
        return err == nil && regSource.ImageStream != nil
1✔
304
}
1✔
305

306
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
307
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
308
        return err == nil && regSource.URL != nil
1✔
309
}
1✔
310

311
func isNodePull(cron *cdiv1.DataImportCron) bool {
1✔
312
        regSource, err := getCronRegistrySource(cron)
1✔
313
        return err == nil && regSource != nil && regSource.PullMethod != nil &&
1✔
314
                *regSource.PullMethod == cdiv1.RegistryPullNode
1✔
315
}
1✔
316

317
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
318
        if !isCronRegistrySource(cron) {
2✔
319
                return nil, errors.Errorf("Cron has no registry source %s", cron.Name)
1✔
320
        }
1✔
321
        return cron.Spec.Template.Spec.Source.Registry, nil
1✔
322
}
323

324
func isCronRegistrySource(cron *cdiv1.DataImportCron) bool {
1✔
325
        source := cron.Spec.Template.Spec.Source
1✔
326
        return source != nil && source.Registry != nil
1✔
327
}
1✔
328

329
func getCronPvcSource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourcePVC, error) {
1✔
330
        if !isPvcSource(cron) {
1✔
331
                return nil, errors.Errorf("Cron has no PVC source %s", cron.Name)
×
332
        }
×
333
        return cron.Spec.Template.Spec.Source.PVC, nil
1✔
334
}
335

336
func isPvcSource(cron *cdiv1.DataImportCron) bool {
1✔
337
        source := cron.Spec.Template.Spec.Source
1✔
338
        return source != nil && source.PVC != nil
1✔
339
}
1✔
340

341
func isControllerPolledSource(cron *cdiv1.DataImportCron) bool {
1✔
342
        return isImageStreamSource(cron) || isPvcSource(cron) || isNodePull(cron)
1✔
343
}
1✔
344

345
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
346
        res := reconcile.Result{}
1✔
347

1✔
348
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
349
        if err != nil {
1✔
350
                return res, err
×
351
        }
×
352

353
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
354
        imports := dataImportCron.Status.CurrentImports
1✔
355
        importSucceeded := false
1✔
356

1✔
357
        dataVolume := dataImportCron.Spec.Template
1✔
358
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
359
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
360
        if err != nil {
1✔
361
                return res, err
×
362
        }
×
363
        if desiredStorageClass != nil {
2✔
364
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
365
                        return res, err
1✔
366
                }
1✔
367
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
368
                desiredSc := desiredStorageClass.Name
1✔
369
                if hasCurrent && currentSc != desiredSc {
2✔
370
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
371
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
372
                                return res, err
×
373
                        }
×
374
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
375
                }
376
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
377
        }
378
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
379
        if err != nil {
1✔
380
                return res, err
×
381
        }
×
382
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
383
        if err != nil {
1✔
384
                return res, err
×
385
        }
×
386

387
        handlePopulatedPvc := func() error {
2✔
388
                if pvc != nil {
2✔
389
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
390
                                return err
×
391
                        }
×
392
                }
393
                importSucceeded = true
1✔
394
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
395
                        return err
×
396
                }
×
397

398
                return nil
1✔
399
        }
400

401
        switch {
1✔
402
        case dv != nil:
1✔
403
                switch dv.Status.Phase {
1✔
404
                case cdiv1.Succeeded:
1✔
405
                        if err := handlePopulatedPvc(); err != nil {
1✔
406
                                return res, err
×
407
                        }
×
408
                case cdiv1.ImportScheduled:
1✔
409
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
410
                case cdiv1.ImportInProgress:
1✔
411
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
412
                default:
1✔
413
                        dvPhase := string(dv.Status.Phase)
1✔
414
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
415
                }
416
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
417
                if err := handlePopulatedPvc(); err != nil {
1✔
418
                        return res, err
×
419
                }
×
420
        case snapshot != nil:
1✔
421
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
422
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
423
                                return res, err
×
424
                        }
×
425
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
426
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
427
                }
428
                // Below k8s 1.29 there's no way to know the source volume mode
429
                // Let's at least expose this info on our own snapshots
430
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
431
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
432
                        if err != nil {
1✔
433
                                return res, err
×
434
                        }
×
435
                        if volMode != nil {
2✔
436
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
437
                        }
1✔
438
                }
439
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
440
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
441
                if err != nil {
2✔
442
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
443
                                return res, err
×
444
                        }
×
445
                } else {
1✔
446
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
447
                }
1✔
448
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
449
                        return res, err
×
450
                }
×
451
                importSucceeded = true
1✔
452
        default:
1✔
453
                if len(imports) > 0 {
2✔
454
                        imports = imports[1:]
1✔
455
                        dataImportCron.Status.CurrentImports = imports
1✔
456
                }
1✔
457
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
458
        }
459

460
        if importSucceeded {
2✔
461
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
462
                        return res, err
×
463
                }
×
464
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
465
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
466
                        return res, err
×
467
                }
×
468
        }
469

470
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
471
                return res, err
×
472
        }
×
473

474
        // Skip if schedule is disabled
475
        if isControllerPolledSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
476
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
477
                pollRes, err := r.pollSourceDigest(ctx, dataImportCron)
1✔
478
                if err != nil {
2✔
479
                        return pollRes, err
1✔
480
                }
1✔
481
                res = pollRes
1✔
482
        }
483

484
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
485
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
486
        if digestUpdated {
2✔
487
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
488
                if dv != nil {
1✔
489
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
490
                                return res, err
×
491
                        }
×
492
                }
493
                if importSucceeded || len(imports) == 0 {
2✔
494
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
495
                                return res, err
1✔
496
                        }
1✔
497
                }
498
        } else if importSucceeded {
2✔
499
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
500
                        return res, err
×
501
                }
×
502
        } else if len(imports) > 0 {
2✔
503
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
504
        } else {
2✔
505
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
506
        }
1✔
507

508
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
509
                return res, err
×
510
        }
×
511

512
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
513
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
514
                        return res, err
×
515
                }
×
516
        }
517
        return res, nil
1✔
518
}
519

520
// Returns the current import DV if exists, and the last imported PVC
521
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
522
        imports := cron.Status.CurrentImports
1✔
523
        if len(imports) == 0 {
2✔
524
                return nil, nil, nil
1✔
525
        }
1✔
526

527
        dvName := imports[0].DataVolumeName
1✔
528
        dv := &cdiv1.DataVolume{}
1✔
529
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
530
                if !k8serrors.IsNotFound(err) {
1✔
531
                        return nil, nil, err
×
532
                }
×
533
                dv = nil
1✔
534
        }
535

536
        pvc := &corev1.PersistentVolumeClaim{}
1✔
537
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
538
                if !k8serrors.IsNotFound(err) {
1✔
539
                        return nil, nil, err
×
540
                }
×
541
                pvc = nil
1✔
542
        }
543
        return dv, pvc, nil
1✔
544
}
545

546
// Returns the current import DV if exists, and the last imported PVC
547
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
548
        imports := cron.Status.CurrentImports
1✔
549
        if len(imports) == 0 {
2✔
550
                return nil, nil
1✔
551
        }
1✔
552

553
        snapName := imports[0].DataVolumeName
1✔
554
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
555
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
556
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
557
                        return nil, err
×
558
                }
×
559
                return nil, nil
1✔
560
        }
561

562
        return snapshot, nil
1✔
563
}
564

565
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
566
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
567
        dataSource := &cdiv1.DataSource{}
1✔
568
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
569
                return nil, err
1✔
570
        }
1✔
571
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
572
                log := r.log.WithName("getCronManagedDataSource")
×
573
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
574
                return nil, ErrNotManagedByCron
×
575
        }
×
576
        return dataSource, nil
1✔
577
}
578

579
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
580
        objCopy := obj.DeepCopyObject()
1✔
581
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
582
        r.setDataImportCronResourceLabels(cron, obj)
1✔
583
        if !reflect.DeepEqual(obj, objCopy) {
2✔
584
                if err := r.client.Update(ctx, obj); err != nil {
1✔
585
                        return err
×
586
                }
×
587
        }
588
        return nil
1✔
589
}
590

591
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
592
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
593
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
594
                if cond.Status == corev1.ConditionFalse &&
×
595
                        (cond.Reason == common.GenericError || cond.Reason == ImagePullFailedReason) {
×
596
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
597
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
598
                        dv.Labels[common.DataImportCronLabel] = ""
×
599
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
600
                                return err
×
601
                        }
×
602
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
603
                                return err
×
604
                        }
×
605
                        cron.Status.CurrentImports = nil
×
606
                }
607
        }
608
        return nil
×
609
}
610

611
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
612
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
613
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
614
        if err != nil {
1✔
615
                return err
×
616
        }
×
617
        if regSource.ImageStream == nil {
1✔
618
                return nil
×
619
        }
×
620
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
621
        if err != nil {
2✔
622
                return err
1✔
623
        }
1✔
624
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
625
        if err != nil {
2✔
626
                return err
1✔
627
        }
1✔
628
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
629
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
630
                log.Info("Updating DataImportCron", "digest", digest)
1✔
631
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
632
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
633
        }
1✔
634
        return nil
1✔
635
}
636

637
func (r *DataImportCronReconciler) updateContainerImageDesiredDigest(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
638
        log := r.log.WithValues("name", cron.Name).WithValues("uid", cron.UID)
1✔
639
        podName := getPollerPodName(cron)
1✔
640
        ns := cron.Namespace
1✔
641
        nn := types.NamespacedName{Name: podName, Namespace: ns}
1✔
642
        pod := &corev1.Pod{}
1✔
643

1✔
644
        if err := r.client.Get(ctx, nn, pod); err == nil {
2✔
645
                digest, err := fetchContainerImageDigest(pod)
1✔
646
                if err != nil || digest == "" {
1✔
647
                        return false, err
×
648
                }
×
649
                cc.AddAnnotation(cron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
650
                if cron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
651
                        log.Info("Updating DataImportCron", "digest", digest)
1✔
652
                        cc.AddAnnotation(cron, AnnSourceDesiredDigest, digest)
1✔
653
                }
1✔
654
                return true, r.client.Delete(ctx, pod)
1✔
655
        } else if cc.IgnoreNotFound(err) != nil {
1✔
656
                return false, err
×
657
        }
×
658

659
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(ctx, r.client)
1✔
660
        if err != nil {
1✔
661
                return false, err
×
662
        }
×
663
        platform := cron.Spec.Template.Spec.Source.Registry.Platform
1✔
664
        if platform != nil && platform.Architecture != "" {
1✔
665
                if workloadNodePlacement.NodeSelector == nil {
×
666
                        workloadNodePlacement.NodeSelector = map[string]string{}
×
667
                }
×
668
                workloadNodePlacement.NodeSelector[corev1.LabelArchStable] = platform.Architecture
×
669
        }
670

671
        containerImage := strings.TrimPrefix(*cron.Spec.Template.Spec.Source.Registry.URL, "docker://")
1✔
672

1✔
673
        pod = &corev1.Pod{
1✔
674
                ObjectMeta: metav1.ObjectMeta{
1✔
675
                        Name:      podName,
1✔
676
                        Namespace: ns,
1✔
677
                        OwnerReferences: []metav1.OwnerReference{
1✔
678
                                {
1✔
679
                                        APIVersion:         cron.APIVersion,
1✔
680
                                        Kind:               cron.Kind,
1✔
681
                                        Name:               cron.Name,
1✔
682
                                        UID:                cron.UID,
1✔
683
                                        BlockOwnerDeletion: ptr.To[bool](true),
1✔
684
                                        Controller:         ptr.To[bool](true),
1✔
685
                                },
1✔
686
                        },
1✔
687
                },
1✔
688
                Spec: corev1.PodSpec{
1✔
689
                        TerminationGracePeriodSeconds: ptr.To[int64](0),
1✔
690
                        RestartPolicy:                 corev1.RestartPolicyNever,
1✔
691
                        NodeSelector:                  workloadNodePlacement.NodeSelector,
1✔
692
                        Tolerations:                   workloadNodePlacement.Tolerations,
1✔
693
                        Affinity:                      workloadNodePlacement.Affinity,
1✔
694
                        Volumes: []corev1.Volume{
1✔
695
                                {
1✔
696
                                        Name: "shared-volume",
1✔
697
                                        VolumeSource: corev1.VolumeSource{
1✔
698
                                                EmptyDir: &corev1.EmptyDirVolumeSource{},
1✔
699
                                        },
1✔
700
                                },
1✔
701
                        },
1✔
702
                        InitContainers: []corev1.Container{
1✔
703
                                {
1✔
704
                                        Name:                     "init",
1✔
705
                                        Image:                    r.image,
1✔
706
                                        ImagePullPolicy:          corev1.PullPolicy(r.pullPolicy),
1✔
707
                                        Command:                  []string{"sh", "-c", "cp /usr/bin/cdi-containerimage-server /shared/server"},
1✔
708
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
709
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
710
                                },
1✔
711
                        },
1✔
712
                        Containers: []corev1.Container{
1✔
713
                                {
1✔
714
                                        Name:                     "image-container",
1✔
715
                                        Image:                    containerImage,
1✔
716
                                        ImagePullPolicy:          corev1.PullAlways,
1✔
717
                                        Command:                  []string{"/shared/server", "-h"},
1✔
718
                                        VolumeMounts:             []corev1.VolumeMount{{Name: "shared-volume", MountPath: "/shared"}},
1✔
719
                                        TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
720
                                },
1✔
721
                        },
1✔
722
                },
1✔
723
        }
1✔
724

1✔
725
        cc.SetRestrictedSecurityContext(&pod.Spec)
1✔
726
        if pod.Spec.SecurityContext != nil {
2✔
727
                pod.Spec.SecurityContext.FSGroup = nil
1✔
728
        }
1✔
729

730
        return false, r.client.Create(ctx, pod)
1✔
731
}
732

733
func fetchContainerImageDigest(pod *corev1.Pod) (string, error) {
1✔
734
        if len(pod.Status.ContainerStatuses) == 0 {
1✔
735
                return "", nil
×
736
        }
×
737

738
        status := pod.Status.ContainerStatuses[0]
1✔
739
        if status.State.Waiting != nil {
1✔
740
                reason := status.State.Waiting.Reason
×
741
                switch reason {
×
742
                case "ImagePullBackOff", "ErrImagePull", "InvalidImageName":
×
743
                        return "", errors.Errorf("%s %s: %s", common.ImagePullFailureText, status.Image, reason)
×
744
                }
745
                return "", nil
×
746
        }
747

748
        if status.State.Terminated == nil {
1✔
749
                return "", nil
×
750
        }
×
751

752
        imageID := status.ImageID
1✔
753
        if imageID == "" {
1✔
754
                return "", errors.Errorf("Container has no imageID")
×
755
        }
×
756
        idx := strings.Index(imageID, digestSha256Prefix)
1✔
757
        if idx < 0 {
1✔
758
                return "", errors.Errorf("Container image %s ID has no digest: %s", status.Image, imageID)
×
759
        }
×
760

761
        return imageID[idx:], nil
1✔
762
}
763

764
func (r *DataImportCronReconciler) updatePvcDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
765
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
766
        pvcSource, err := getCronPvcSource(dataImportCron)
1✔
767
        if err != nil {
1✔
768
                return err
×
769
        }
×
770
        ns := pvcSource.Namespace
1✔
771
        if ns == "" {
2✔
772
                ns = dataImportCron.Namespace
1✔
773
        }
1✔
774
        pvc := &corev1.PersistentVolumeClaim{}
1✔
775
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: ns, Name: pvcSource.Name}, pvc); err != nil {
2✔
776
                return err
1✔
777
        }
1✔
778
        digest := fmt.Sprintf("%s%s", digestUIDPrefix, pvc.UID)
1✔
779
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
780
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
781
                log.Info("Updating DataImportCron", "digest", digest)
1✔
782
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
783
        }
1✔
784
        return nil
1✔
785
}
786

787
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
788
        log := r.log.WithName("updateDataSource")
1✔
789
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
790
        if err != nil {
2✔
791
                if k8serrors.IsNotFound(err) {
2✔
792
                        dataSource = r.newDataSource(dataImportCron)
1✔
793
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
794
                                return err
×
795
                        }
×
796
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
797
                } else if errors.Is(err, ErrNotManagedByCron) {
×
798
                        return nil
×
799
                } else {
×
800
                        return err
×
801
                }
×
802
        }
803
        dataSourceCopy := dataSource.DeepCopy()
1✔
804
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
805

1✔
806
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
807
        populateDataSource(format, dataSource, sourcePVC)
1✔
808

1✔
809
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
810
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
811
                        return err
×
812
                }
×
813
        }
814

815
        return nil
1✔
816
}
817

818
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
819
        if sourcePVC == nil {
2✔
820
                return
1✔
821
        }
1✔
822

823
        switch format {
1✔
824
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
825
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
826
                        PVC: sourcePVC,
1✔
827
                }
1✔
828
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
829
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
830
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
831
                                Namespace: sourcePVC.Namespace,
1✔
832
                                Name:      sourcePVC.Name,
1✔
833
                        },
1✔
834
                }
1✔
835
        }
836
}
837

838
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
839
        if dataImportCron.Status.CurrentImports == nil {
1✔
840
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
841
        }
×
842
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
843
                Namespace: dataImportCron.Namespace,
1✔
844
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
845
        }
1✔
846
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
847
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
848
                now := metav1.Now()
1✔
849
                dataImportCron.Status.LastImportTimestamp = &now
1✔
850
        }
1✔
851
        return nil
1✔
852
}
853

854
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
855
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
856
        if lastTimeStr == "" {
2✔
857
                return nil
1✔
858
        }
1✔
859
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
860
        if err != nil {
1✔
861
                return err
×
862
        }
×
863
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
864
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
865
        }
1✔
866
        return nil
1✔
867
}
868

869
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
870
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
871
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
872
        if digest == "" {
1✔
873
                return nil
×
874
        }
×
875
        dvName, err := createDvName(dataSourceName, digest)
1✔
876
        if err != nil {
2✔
877
                return err
1✔
878
        }
1✔
879
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
880

1✔
881
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
882
        for _, src := range sources {
2✔
883
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
884
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
885
                                return err
×
886
                        }
×
887
                } else {
1✔
888
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
889
                                return err
×
890
                        }
×
891
                        // If source exists don't create DV
892
                        return nil
1✔
893
                }
894
        }
895
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
896
        if allowed, err := r.authorizeCloneDataVolume(dataImportCron, dv); err != nil {
1✔
NEW
897
                return err
×
898
        } else if !allowed {
2✔
899
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse,
1✔
900
                        "Not authorized to create DataVolume", notAuthorized)
1✔
901
                return nil
1✔
902
        }
1✔
903
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
904
                return err
×
905
        }
×
906

907
        return nil
1✔
908
}
909

910
func (r *DataImportCronReconciler) authorizeCloneDataVolume(dataImportCron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) (bool, error) {
1✔
911
        if !isPvcSource(dataImportCron) {
2✔
912
                return true, nil
1✔
913
        }
1✔
914

915
        createdBy := dataImportCron.Spec.CreatedBy
1✔
916
        if createdBy == nil {
2✔
917
                r.log.Info("Not authorized to create DataVolume without CreatedBy", "cron", dataImportCron.Name)
1✔
918
                return false, nil
1✔
919
        }
1✔
920

NEW
921
        var userInfo authenticationv1.UserInfo
×
NEW
922
        if err := json.Unmarshal([]byte(*createdBy), &userInfo); err != nil {
×
NEW
923
                return false, err
×
NEW
924
        }
×
925

NEW
926
        var resp cdiv1.CloneAuthResponse
×
NEW
927
        var err error
×
NEW
928
        if saNamespace, saName, ok := parseServiceAccount(userInfo.Username); ok {
×
NEW
929
                r.log.Info("Using creator ServiceAccount for authorization", "namespace", saNamespace, "name", saName, "cron", dataImportCron.Name)
×
NEW
930
                resp, err = dv.AuthorizeSA(dv.Namespace, dv.Name, r, saNamespace, saName)
×
NEW
931
        } else {
×
NEW
932
                r.log.Info("Using creator User for authorization", "username", userInfo.Username, "cron", dataImportCron.Name)
×
NEW
933
                resp, err = dv.AuthorizeUser(dv.Namespace, dv.Name, r, userInfo)
×
NEW
934
        }
×
935

NEW
936
        if err != nil {
×
NEW
937
                return false, err
×
NEW
938
        }
×
NEW
939
        if !resp.Allowed {
×
NEW
940
                r.log.Info("Not authorized to create DataVolume", "cron", dataImportCron.Name, "reason", resp.Reason)
×
NEW
941
                return false, nil
×
NEW
942
        }
×
943

NEW
944
        return true, nil
×
945
}
946

NEW
947
func (r *DataImportCronReconciler) CreateSar(sar *authorizationv1.SubjectAccessReview) (*authorizationv1.SubjectAccessReview, error) {
×
NEW
948
        if err := r.client.Create(context.TODO(), sar); err != nil {
×
NEW
949
                return nil, err
×
NEW
950
        }
×
NEW
951
        return sar, nil
×
952
}
953

NEW
954
func (r *DataImportCronReconciler) GetNamespace(name string) (*corev1.Namespace, error) {
×
NEW
955
        return nil, nil
×
NEW
956
}
×
957

NEW
958
func (r *DataImportCronReconciler) GetDataSource(namespace, name string) (*cdiv1.DataSource, error) {
×
NEW
959
        return nil, nil
×
NEW
960
}
×
961

962
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
963
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
964
        if !ok {
1✔
965
                // nothing to delete
×
966
                return nil
×
967
        }
×
968
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
969
        if err != nil {
1✔
UNCOV
970
                return err
×
UNCOV
971
        }
×
972
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
973
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
974
        for _, src := range sources {
2✔
975
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
UNCOV
976
                        return err
×
UNCOV
977
                }
×
978
        }
979
        for _, src := range sources {
2✔
980
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
UNCOV
981
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
UNCOV
982
                }
×
983
        }
984
        // Only update desired storage class once garbage collection went through
985
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
986
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
987
        if err != nil {
1✔
UNCOV
988
                return err
×
989
        }
×
990

991
        return nil
1✔
992
}
993

994
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
995
        switch format {
1✔
996
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
997
                return nil
1✔
998
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
999
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
UNCOV
1000
        default:
×
UNCOV
1001
                return fmt.Errorf("unknown source format for snapshot")
×
1002
        }
1003
}
1004

1005
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
1006
        if pvc == nil {
1✔
UNCOV
1007
                return nil
×
1008
        }
×
1009
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
1010
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
1011
                return nil
1✔
1012
        }
1✔
1013
        storageProfile := &cdiv1.StorageProfile{}
1✔
1014
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
1015
                return err
×
1016
        }
×
1017
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
1018
        if err != nil {
1✔
UNCOV
1019
                return err
×
UNCOV
1020
        }
×
1021
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
1022
                ObjectMeta: metav1.ObjectMeta{
1✔
1023
                        Name:      pvc.Name,
1✔
1024
                        Namespace: dataImportCron.Namespace,
1✔
1025
                        Labels: map[string]string{
1✔
1026
                                common.CDILabelKey:       common.CDILabelValue,
1✔
1027
                                common.CDIComponentLabel: "",
1✔
1028
                        },
1✔
1029
                },
1✔
1030
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
1031
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
1032
                                PersistentVolumeClaimName: &pvc.Name,
1✔
1033
                        },
1✔
1034
                        VolumeSnapshotClassName: &className,
1✔
1035
                },
1✔
1036
        }
1✔
1037
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
1038
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
1039

1✔
1040
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
1041
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
1042
                if !k8serrors.IsNotFound(err) {
1✔
UNCOV
1043
                        return err
×
UNCOV
1044
                }
×
1045
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1046
                if pvc.Spec.VolumeMode != nil {
2✔
1047
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
1048
                }
1✔
1049
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
UNCOV
1050
                        return err
×
1051
                }
×
1052
        } else {
1✔
1053
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
1054
                        // Clean up DV/PVC as they are not needed anymore
1✔
1055
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
1056
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
UNCOV
1057
                                return err
×
1058
                        }
×
1059
                }
1060
        }
1061

1062
        return nil
1✔
1063
}
1064

1065
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
1066
        dataImportCron.Status.SourceFormat = &format
1✔
1067

1✔
1068
        switch format {
1✔
1069
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
1070
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1071
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
1072
                if snapshot == nil {
2✔
1073
                        // Snapshot create/update will trigger reconcile
1✔
1074
                        return nil
1✔
1075
                }
1✔
1076
                if cc.IsSnapshotReady(snapshot) {
2✔
1077
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
1078
                } else {
2✔
1079
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
1080
                }
1✔
UNCOV
1081
        default:
×
UNCOV
1082
                return fmt.Errorf("unknown source format for snapshot")
×
1083
        }
1084

1085
        return nil
1✔
1086
}
1087

1088
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
1089
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
1090
        if desiredStorageClass == nil {
2✔
1091
                return format, nil
1✔
1092
        }
1✔
1093

1094
        storageProfile := &cdiv1.StorageProfile{}
1✔
1095
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
UNCOV
1096
                return format, err
×
UNCOV
1097
        }
×
1098
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
1099
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
1100
        }
1✔
1101

1102
        return format, nil
1✔
1103
}
1104

1105
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
1106
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
UNCOV
1107
                return nil
×
UNCOV
1108
        }
×
1109
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
1110
        if err != nil {
1✔
UNCOV
1111
                return err
×
UNCOV
1112
        }
×
1113

1114
        maxImports := defaultImportsToKeepPerCron
1✔
1115

1✔
1116
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
1117
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
1118
        }
1✔
1119

1120
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
UNCOV
1121
                return err
×
UNCOV
1122
        }
×
1123
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
UNCOV
1124
                return err
×
UNCOV
1125
        }
×
1126

1127
        return nil
1✔
1128
}
1129

1130
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
1131
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
1132

1✔
1133
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
UNCOV
1134
                return err
×
UNCOV
1135
        }
×
1136
        if len(pvcList.Items) > maxImports {
2✔
1137
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
1138
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
1139
                })
1✔
1140
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
1141
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1142
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
1143
                                return err
×
UNCOV
1144
                        }
×
1145
                }
1146
        }
1147

1148
        dvList := &cdiv1.DataVolumeList{}
1✔
1149
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
UNCOV
1150
                return err
×
1151
        }
×
1152

1153
        if len(dvList.Items) > maxImports {
2✔
1154
                for _, dv := range dvList.Items {
2✔
1155
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
1156
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
UNCOV
1157
                                return err
×
1158
                        }
×
1159

1160
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1161
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
1162
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
UNCOV
1163
                                        return err
×
UNCOV
1164
                                }
×
1165
                        }
1166
                }
1167
        }
1168

1169
        return nil
1✔
1170
}
1171

1172
// deleteDvPvc deletes DV or PVC if DV was GCed
1173
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
1174
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
1175
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
1176
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
1177
                return err
1✔
1178
        }
1✔
1179
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
1180
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
UNCOV
1181
                return err
×
UNCOV
1182
        }
×
1183
        return nil
1✔
1184
}
1185

1186
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
1187
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
1188

1✔
1189
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
1190
                if meta.IsNoMatchError(err) {
×
UNCOV
1191
                        return nil
×
UNCOV
1192
                }
×
UNCOV
1193
                return err
×
1194
        }
1195
        if len(snapList.Items) > maxImports {
1✔
UNCOV
1196
                sort.Slice(snapList.Items, func(i, j int) bool {
×
UNCOV
1197
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
1198
                })
×
1199
                for _, snap := range snapList.Items[maxImports:] {
×
1200
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
1201
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
UNCOV
1202
                                return err
×
UNCOV
1203
                        }
×
1204
                }
1205
        }
1206

1207
        return nil
1✔
1208
}
1209

1210
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
1211
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
1212
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
1213

1✔
1214
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
UNCOV
1215
                return err
×
UNCOV
1216
        }
×
1217
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
1218
        if err != nil {
1✔
UNCOV
1219
                return err
×
UNCOV
1220
        }
×
1221
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
1222
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
1223
                return err
×
1224
        }
×
1225
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
UNCOV
1226
                return err
×
1227
        }
×
1228
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
UNCOV
1229
                return err
×
UNCOV
1230
        }
×
1231
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
1232
                return err
×
UNCOV
1233
        }
×
1234
        return nil
1✔
1235
}
1236

1237
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
1238
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
1239
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
1240
        if err != nil {
1✔
1241
                return err
×
UNCOV
1242
        }
×
1243
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
1244
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
UNCOV
1245
                return err
×
UNCOV
1246
        }
×
1247
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
UNCOV
1248
                return err
×
1249
        }
×
1250

1251
        return nil
1✔
1252
}
1253

1254
// NewDataImportCronController creates a new instance of the DataImportCron controller
UNCOV
1255
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
1256
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
1257
                Scheme: mgr.GetScheme(),
×
UNCOV
1258
                Mapper: mgr.GetRESTMapper(),
×
UNCOV
1259
        })
×
UNCOV
1260
        if err != nil {
×
UNCOV
1261
                return nil, err
×
UNCOV
1262
        }
×
1263
        reconciler := &DataImportCronReconciler{
×
1264
                client:          mgr.GetClient(),
×
1265
                uncachedClient:  uncachedClient,
×
1266
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1267
                scheme:          mgr.GetScheme(),
×
1268
                log:             log.WithName(dataImportControllerName),
×
1269
                image:           importerImage,
×
1270
                pullPolicy:      pullPolicy,
×
1271
                cdiNamespace:    util.GetNamespace(),
×
1272
                installerLabels: installerLabels,
×
1273
        }
×
1274
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1275
                MaxConcurrentReconciles: 3,
×
1276
                Reconciler:              reconciler,
×
1277
        })
×
1278
        if err != nil {
×
1279
                return nil, err
×
1280
        }
×
1281
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1282
                return nil, err
×
1283
        }
×
1284
        log.Info("Initialized DataImportCron controller")
×
1285
        return dataImportCronController, nil
×
1286
}
1287

1288
func getCronName(obj client.Object) string {
×
1289
        return obj.GetLabels()[common.DataImportCronLabel]
×
1290
}
×
1291

1292
func getCronNs(obj client.Object) string {
×
1293
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
UNCOV
1294
}
×
1295

1296
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1297
        if cronName := getCronName(obj); cronName != "" {
×
1298
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
UNCOV
1299
        }
×
1300
        return nil
×
1301
}
1302

UNCOV
1303
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1304
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1305
                return err
×
1306
        }
×
1307

1308
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
UNCOV
1309
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
UNCOV
1310
                // Otherwise we risk losing the storage profile event
×
1311
                var crons cdiv1.DataImportCronList
×
1312
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1313
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1314
                        return nil
×
UNCOV
1315
                }
×
1316
                // Storage profiles are 1:1 to storage classes
1317
                scName := obj.GetName()
×
1318
                var reqs []reconcile.Request
×
1319
                for _, cron := range crons.Items {
×
1320
                        dataVolume := cron.Spec.Template
×
1321
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1322
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1323
                        if err != nil || templateSc == nil {
×
UNCOV
1324
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1325
                                return reqs
×
1326
                        }
×
1327
                        if templateSc.Name == scName {
×
1328
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1329
                        }
×
1330
                }
1331
                return reqs
×
1332
        }
1333

1334
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1335
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1336
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1337
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
UNCOV
1338
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1339
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1340
                },
UNCOV
1341
        )); err != nil {
×
1342
                return err
×
1343
        }
×
1344

1345
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1346
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1347
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
UNCOV
1348
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1349
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1350
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1351
                },
UNCOV
1352
        )); err != nil {
×
1353
                return err
×
1354
        }
×
1355

1356
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1357
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1358
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
UNCOV
1359
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1360
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1361
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1362
                },
UNCOV
1363
        )); err != nil {
×
1364
                return err
×
1365
        }
×
1366

1367
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1368
                return err
×
1369
        }
×
1370

1371
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1372
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1373
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
UNCOV
1374
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1375
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1376
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1377
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
UNCOV
1378
                        },
×
1379
                },
1380
        )); err != nil {
×
1381
                return err
×
1382
        }
×
1383

1384
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1385
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1386
        }
×
1387

1388
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1389
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1390
                predicate.TypedFuncs[*batchv1.CronJob]{
×
UNCOV
1391
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1392
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1393
                        },
×
1394
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
UNCOV
1395
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1396
                },
1397
        )); err != nil {
×
1398
                return err
×
1399
        }
×
1400

1401
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1402
                if meta.IsNoMatchError(err) {
×
1403
                        // Back out if there's no point to attempt watch
×
UNCOV
1404
                        return nil
×
1405
                }
×
1406
                if !cc.IsErrCacheNotStarted(err) {
×
1407
                        return err
×
UNCOV
1408
                }
×
1409
        }
1410
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1411
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1412
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1413
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1414
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1415
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1416
                },
UNCOV
1417
        )); err != nil {
×
1418
                return err
×
1419
        }
×
1420

1421
        return nil
×
1422
}
1423

1424
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1425
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1426
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1427
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
UNCOV
1428
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1429
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
UNCOV
1430
                                log.Info("Update", "sc", obj.GetName(),
×
UNCOV
1431
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
UNCOV
1432
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1433
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1434
                                if err != nil {
×
1435
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1436
                                }
×
1437
                                return reqs
×
1438
                        },
1439
                ),
1440
                predicate.TypedFuncs[*storagev1.StorageClass]{
1441
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1442
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1443
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1444
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1445
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
UNCOV
1446
                        },
×
1447
                },
UNCOV
1448
        )); err != nil {
×
1449
                return err
×
1450
        }
×
1451

1452
        return nil
×
1453
}
1454

UNCOV
1455
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1456
        dicList := &cdiv1.DataImportCronList{}
×
1457
        if err := c.List(ctx, dicList); err != nil {
×
1458
                return nil, err
×
UNCOV
1459
        }
×
1460
        reqs := []reconcile.Request{}
×
UNCOV
1461
        for _, dic := range dicList.Items {
×
UNCOV
1462
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1463
                        continue
×
1464
                }
1465

1466
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1467
        }
1468

1469
        return reqs, nil
×
1470
}
1471

1472
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1473
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1474
                return false, nil
1✔
1475
        }
1✔
1476

1477
        sc := pvc.Spec.StorageClassName
1✔
1478
        if sc == nil || *sc == desiredStorageClass {
2✔
1479
                return false, nil
1✔
1480
        }
1✔
1481

1482
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1483
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
UNCOV
1484
                return false, err
×
UNCOV
1485
        }
×
1486

1487
        return true, nil
1✔
1488
}
1489

1490
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1491
        cronJob := &batchv1.CronJob{}
1✔
1492
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1493
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1494
                return false, cc.IgnoreNotFound(err)
1✔
1495
        }
1✔
1496

1497
        cronJobCopy := cronJob.DeepCopy()
1✔
1498
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
UNCOV
1499
                return false, err
×
UNCOV
1500
        }
×
1501

1502
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1503
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1504
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
UNCOV
1505
                        return false, cc.IgnoreNotFound(err)
×
UNCOV
1506
                }
×
1507
        }
1508
        return true, nil
1✔
1509
}
1510

1511
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1512
        cronJob := &batchv1.CronJob{
1✔
1513
                ObjectMeta: metav1.ObjectMeta{
1✔
1514
                        Name:      GetCronJobName(cron),
1✔
1515
                        Namespace: r.cdiNamespace,
1✔
1516
                },
1✔
1517
        }
1✔
1518
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
UNCOV
1519
                return nil, err
×
UNCOV
1520
        }
×
1521
        return cronJob, nil
1✔
1522
}
1523

1524
// InitPollerPod inits poller Pod
1525
func InitPollerPod(c client.Client, cron *cdiv1.DataImportCron, pod *corev1.PodTemplateSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1526
        regSource, err := getCronRegistrySource(cron)
1✔
1527
        if err != nil {
1✔
1528
                return err
×
UNCOV
1529
        }
×
1530
        if regSource.URL == nil {
1✔
UNCOV
1531
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
UNCOV
1532
        }
×
1533
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1534
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
UNCOV
1535
                return err
×
1536
        }
×
1537
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1538
        if err != nil {
1✔
1539
                return err
×
1540
        }
×
1541
        container := corev1.Container{
1✔
1542
                Name:  "cdi-source-update-poller",
1✔
1543
                Image: image,
1✔
1544
                Command: []string{
1✔
1545
                        "/usr/bin/cdi-source-update-poller",
1✔
1546
                        "-ns", cron.Namespace,
1✔
1547
                        "-cron", cron.Name,
1✔
1548
                        "-url", *regSource.URL,
1✔
1549
                },
1✔
1550
                ImagePullPolicy:          pullPolicy,
1✔
1551
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1552
                TerminationMessagePolicy: corev1.TerminationMessageFallbackToLogsOnError,
1✔
1553
        }
1✔
1554

1✔
1555
        var volumes []corev1.Volume
1✔
1556
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1557
        if hasCertConfigMap {
1✔
UNCOV
1558
                vm := corev1.VolumeMount{
×
UNCOV
1559
                        Name:      CertVolName,
×
UNCOV
1560
                        MountPath: common.ImporterCertDir,
×
UNCOV
1561
                }
×
UNCOV
1562
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
UNCOV
1563
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
UNCOV
1564
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
UNCOV
1565
        }
×
1566

1567
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1568
                vm := corev1.VolumeMount{
1✔
1569
                        Name:      ProxyCertVolName,
1✔
1570
                        MountPath: common.ImporterProxyCertDir,
1✔
1571
                }
1✔
1572
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1573
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1574
        }
1✔
1575

1576
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
UNCOV
1577
                container.Env = append(container.Env,
×
UNCOV
1578
                        corev1.EnvVar{
×
UNCOV
1579
                                Name: common.ImporterAccessKeyID,
×
UNCOV
1580
                                ValueFrom: &corev1.EnvVarSource{
×
UNCOV
1581
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
UNCOV
1582
                                                LocalObjectReference: corev1.LocalObjectReference{
×
UNCOV
1583
                                                        Name: *regSource.SecretRef,
×
UNCOV
1584
                                                },
×
1585
                                                Key: common.KeyAccess,
×
1586
                                        },
×
1587
                                },
×
1588
                        },
×
1589
                        corev1.EnvVar{
×
1590
                                Name: common.ImporterSecretKey,
×
1591
                                ValueFrom: &corev1.EnvVarSource{
×
1592
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1593
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1594
                                                        Name: *regSource.SecretRef,
×
1595
                                                },
×
1596
                                                Key: common.KeySecret,
×
1597
                                        },
×
1598
                                },
×
1599
                        },
×
1600
                )
×
1601
        }
×
1602

1603
        addEnvVar := func(varName, value string) {
2✔
1604
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1605
        }
1✔
1606

1607
        if insecureTLS {
1✔
1608
                addEnvVar(common.InsecureTLSVar, "true")
×
1609
        }
×
1610

1611
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1612
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1613
                        addEnvVar(varName, value)
1✔
1614
                }
1✔
1615
        }
1616

1617
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1618
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1619
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1620

1✔
1621
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1622
        if err != nil {
1✔
UNCOV
1623
                return err
×
UNCOV
1624
        }
×
1625
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1626
        if err != nil {
1✔
UNCOV
1627
                return err
×
UNCOV
1628
        }
×
1629

1630
        podSpec := &pod.Spec
1✔
1631

1✔
1632
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1633
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1634
        podSpec.Containers = []corev1.Container{container}
1✔
1635
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1636
        podSpec.Volumes = volumes
1✔
1637
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1638
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1639
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1640
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1641

1✔
1642
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1643
        // No need for specifid uid/fsgroup here since this doesn't write or use qemu
1✔
1644
        if podSpec.SecurityContext != nil {
2✔
1645
                podSpec.SecurityContext.FSGroup = nil
1✔
1646
        }
1✔
1647
        if podSpec.Containers[0].SecurityContext != nil {
2✔
1648
                podSpec.Containers[0].SecurityContext.RunAsUser = nil
1✔
1649
        }
1✔
1650

1651
        if pod.Labels == nil {
2✔
1652
                pod.Labels = map[string]string{}
1✔
1653
        }
1✔
1654
        pod.Labels[common.DataImportCronPollerLabel] = ""
1✔
1655

1✔
1656
        return nil
1✔
1657
}
1658

1659
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1660
        cronJobSpec := &cronJob.Spec
1✔
1661
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1662
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1663
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1664
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1665

1✔
1666
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1667
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1668
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1669
        cc.AddAnnotation(&jobSpec.Template, secv1.RequiredSCCAnnotation, common.RestrictedSCCName)
1✔
1670

1✔
1671
        pod := &jobSpec.Template
1✔
1672
        if err := InitPollerPod(r.client, cron, pod, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
UNCOV
1673
                return err
×
UNCOV
1674
        }
×
1675
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
UNCOV
1676
                return err
×
UNCOV
1677
        }
×
1678
        return nil
1✔
1679
}
1680

1681
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1682
        job := &batchv1.Job{
1✔
1683
                ObjectMeta: metav1.ObjectMeta{
1✔
1684
                        Name:      GetInitialJobName(cron),
1✔
1685
                        Namespace: cronJob.Namespace,
1✔
1686
                },
1✔
1687
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1688
        }
1✔
1689
        if err := r.setJobCommon(cron, job); err != nil {
1✔
UNCOV
1690
                return nil, err
×
UNCOV
1691
        }
×
1692
        return job, nil
1✔
1693
}
1694

1695
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1696
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
UNCOV
1697
                return err
×
1698
        }
×
1699
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1700
        labels := obj.GetLabels()
1✔
1701
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1702
        labels[common.DataImportCronLabel] = cron.Name
1✔
1703
        obj.SetLabels(labels)
1✔
1704
        return nil
1✔
1705
}
1706

1707
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1708
        dv := cron.Spec.Template.DeepCopy()
1✔
1709
        if isCronRegistrySource(cron) {
2✔
1710
                var digestedURL string
1✔
1711
                if isURLSource(cron) {
2✔
1712
                        digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1713
                } else if isImageStreamSource(cron) {
3✔
1714
                        // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1715
                        digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1716
                        dv.Spec.Source.Registry.ImageStream = nil
1✔
1717
                }
1✔
1718
                dv.Spec.Source.Registry.URL = &digestedURL
1✔
1719
        }
1720
        dv.Name = dataVolumeName
1✔
1721
        dv.Namespace = cron.Namespace
1✔
1722
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1723
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1724
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1725
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1726

1✔
1727
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1728
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1729
        }
1✔
1730

1731
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1732

1✔
1733
        return dv
1✔
1734
}
1735

1736
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1737
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1738
        cc.CopyAllowedLabels(cron.GetLabels(), obj, true)
1✔
1739
        labels := obj.GetLabels()
1✔
1740
        labels[common.DataImportCronLabel] = cron.Name
1✔
1741
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1742
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1743
        }
1✔
1744
        obj.SetLabels(labels)
1✔
1745
}
1746

1747
func untagDigestedDockerURL(dockerURL string) string {
1✔
1748
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1749
                url := u.Host + u.Path
1✔
1750
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1751
                // Check for tag
1✔
1752
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1753
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1754
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1755
                        }
1✔
1756
                }
1757
        }
1758
        return dockerURL
1✔
1759
}
1760

1761
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1762
        if val := cron.Labels[ann]; val != "" {
2✔
1763
                cc.AddLabel(dv, ann, val)
1✔
1764
        }
1✔
1765
}
1766

1767
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1768
        if val := cron.Annotations[ann]; val != "" {
1✔
UNCOV
1769
                cc.AddAnnotation(dv, ann, val)
×
UNCOV
1770
        }
×
1771
}
1772

1773
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1774
        dataSource := &cdiv1.DataSource{
1✔
1775
                ObjectMeta: metav1.ObjectMeta{
1✔
1776
                        Name:      cron.Spec.ManagedDataSource,
1✔
1777
                        Namespace: cron.Namespace,
1✔
1778
                },
1✔
1779
        }
1✔
1780
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1781
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1782
        cc.CopyAllowedLabels(cron.GetLabels(), dataSource, true)
1✔
1783
        return dataSource
1✔
1784
}
1✔
1785

1786
// Create DataVolume name based on the DataSource name + prefix of the digest
1787
func createDvName(prefix, digest string) (string, error) {
1✔
1788
        digestPrefix := ""
1✔
1789
        if strings.HasPrefix(digest, digestSha256Prefix) {
2✔
1790
                digestPrefix = digestSha256Prefix
1✔
1791
        } else if strings.HasPrefix(digest, digestUIDPrefix) {
3✔
1792
                digestPrefix = digestUIDPrefix
1✔
1793
        } else {
2✔
1794
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1795
        }
1✔
1796
        fromIdx := len(digestPrefix)
1✔
1797
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1798
        if len(digest) < toIdx {
2✔
1799
                return "", errors.Errorf("Digest is too short")
1✔
1800
        }
1✔
1801
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1802
}
1803

1804
// GetCronJobName get CronJob name based on cron name and UID
1805
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1806
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1807
}
1✔
1808

1809
// GetInitialJobName get initial job name based on cron name and UID
1810
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1811
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1812
}
1✔
1813

1814
func getPollerPodName(cron *cdiv1.DataImportCron) string {
1✔
1815
        return naming.GetResourceName("poller-"+cron.Name, string(cron.UID)[:8])
1✔
1816
}
1✔
1817

1818
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1819
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1820
}
1✔
1821

1822
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1823
        dv := &cron.Spec.Template
1✔
1824

1✔
1825
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
UNCOV
1826
                return explicitVolumeMode, nil
×
UNCOV
1827
        }
×
1828

1829
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1830
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1831
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1832
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1833
                        AccessModes:      accessModes,
1✔
1834
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1835
                        Resources: corev1.VolumeResourceRequirements{
1✔
1836
                                Requests: corev1.ResourceList{
1✔
1837
                                        // Doesn't matter
1✔
1838
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1839
                                },
1✔
1840
                        },
1✔
1841
                },
1✔
1842
        }
1✔
1843
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
UNCOV
1844
                return nil, err
×
UNCOV
1845
        }
×
1846

1847
        return inferredPvc.Spec.VolumeMode, nil
1✔
1848
}
1849

1850
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1851
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1852
        if dv.Spec.PVC != nil {
1✔
1853
                return dv.Spec.PVC.VolumeMode
×
UNCOV
1854
        }
×
1855

1856
        if dv.Spec.Storage != nil {
2✔
1857
                return dv.Spec.Storage.VolumeMode
1✔
1858
        }
1✔
1859

UNCOV
1860
        return nil
×
1861
}
1862

1863
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1864
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1865
        if dv.Spec.PVC != nil {
1✔
UNCOV
1866
                return dv.Spec.PVC.AccessModes
×
UNCOV
1867
        }
×
1868

1869
        if dv.Spec.Storage != nil {
2✔
1870
                return dv.Spec.Storage.AccessModes
1✔
1871
        }
1✔
1872

UNCOV
1873
        return nil
×
1874
}
1875

1876
// parseServiceAccount extracts namespace and service account name from username
1877
// Username format: "system:serviceaccount:<namespace>:<serviceaccount-name>"
NEW
UNCOV
1878
func parseServiceAccount(username string) (namespace, saName string, ok bool) {
×
NEW
UNCOV
1879
        const prefix = "system:serviceaccount:"
×
NEW
UNCOV
1880
        if !strings.HasPrefix(username, prefix) {
×
NEW
1881
                return "", "", false
×
NEW
UNCOV
1882
        }
×
NEW
1883
        parts := strings.Split(strings.TrimPrefix(username, prefix), ":")
×
NEW
1884
        if len(parts) != 2 {
×
NEW
1885
                return "", "", false
×
NEW
1886
        }
×
NEW
1887
        return parts[0], parts[1], true
×
1888
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc