• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4879

16 Aug 2024 03:54AM UTC coverage: 59.187% (-0.08%) from 59.27%
#4879

push

travis-ci

web-flow
Setup ginkgo cli build properly to avoid double dep (#3378)

* Setup ginkgo cli build properly to avoid double dep

Today we have the ginkgo CLI brought into the builder and
also to the project itself. This results in
```
 Ginkgo detected a version mismatch between the Ginkgo CLI and the version of Ginkgo imported by your packages:
  Ginkgo CLI Version:
    2.12.0
  Mismatched package versions found:
    2.17.1 used by tests
```
This commit provides the necessary build adaptations to get rid of the
builder ginkgo CLI dependency.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* update builder to latest

https://github.com/kubevirt/containerized-data-importer/pull/3379

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

---------

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

16609 of 28062 relevant lines covered (59.19%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.07
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        "github.com/pkg/errors"
33
        cronexpr "github.com/robfig/cron/v3"
34

35
        batchv1 "k8s.io/api/batch/v1"
36
        corev1 "k8s.io/api/core/v1"
37
        storagev1 "k8s.io/api/storage/v1"
38
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
39
        "k8s.io/apimachinery/pkg/api/meta"
40
        "k8s.io/apimachinery/pkg/api/resource"
41
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
        "k8s.io/apimachinery/pkg/labels"
43
        "k8s.io/apimachinery/pkg/runtime"
44
        "k8s.io/apimachinery/pkg/types"
45
        "k8s.io/client-go/tools/record"
46
        "k8s.io/utils/ptr"
47

48
        "sigs.k8s.io/controller-runtime/pkg/client"
49
        "sigs.k8s.io/controller-runtime/pkg/controller"
50
        "sigs.k8s.io/controller-runtime/pkg/event"
51
        "sigs.k8s.io/controller-runtime/pkg/handler"
52
        "sigs.k8s.io/controller-runtime/pkg/manager"
53
        "sigs.k8s.io/controller-runtime/pkg/predicate"
54
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
55
        "sigs.k8s.io/controller-runtime/pkg/source"
56

57
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
58
        "kubevirt.io/containerized-data-importer/pkg/common"
59
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
60
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
61
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
62
        "kubevirt.io/containerized-data-importer/pkg/operator"
63
        "kubevirt.io/containerized-data-importer/pkg/util"
64
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
65
)
66

67
const (
68
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
69
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
70
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
71
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
72
)
73

74
// DataImportCronReconciler members
75
type DataImportCronReconciler struct {
76
        client          client.Client
77
        uncachedClient  client.Client
78
        recorder        record.EventRecorder
79
        scheme          *runtime.Scheme
80
        log             logr.Logger
81
        image           string
82
        pullPolicy      string
83
        cdiNamespace    string
84
        installerLabels map[string]string
85
}
86

87
const (
88
        // AnnSourceDesiredDigest is the digest of the pending updated image
89
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
90
        // AnnImageStreamDockerRef is the ImageStream Docker reference
91
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
92
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
93
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
94
        // AnnLastCronTime is the cron last execution time stamp
95
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
96
        // AnnLastUseTime is the PVC last use time stamp
97
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
98
        // AnnStorageClass is the cron DV's storage class
99
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
100

101
        dataImportControllerName    = "dataimportcron-controller"
102
        digestPrefix                = "sha256:"
103
        digestDvNameSuffixLength    = 12
104
        cronJobUIDSuffixLength      = 8
105
        defaultImportsToKeepPerCron = 3
106
)
107

108
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
109

110
// Reconcile loop for DataImportCronReconciler
111
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
112
        dataImportCron := &cdiv1.DataImportCron{}
1✔
113
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
114
                return reconcile.Result{}, err
1✔
115
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
116
                err := r.cleanup(ctx, req.NamespacedName)
1✔
117
                return reconcile.Result{}, err
1✔
118
        }
1✔
119
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
120
        if !shouldReconcile || err != nil {
1✔
121
                return reconcile.Result{}, err
×
122
        }
×
123

124
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
125
                return reconcile.Result{}, err
×
126
        }
×
127

128
        return r.update(ctx, dataImportCron)
1✔
129
}
130

131
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
132
        dataSource := &cdiv1.DataSource{}
1✔
133
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
134
                if k8serrors.IsNotFound(err) {
2✔
135
                        return true, nil
1✔
136
                }
1✔
137
                return false, err
×
138
        }
139
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
140
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
141
                return true, nil
1✔
142
        }
1✔
143
        otherCron := &cdiv1.DataImportCron{}
1✔
144
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
145
                if k8serrors.IsNotFound(err) {
2✔
146
                        return true, nil
1✔
147
                }
1✔
148
                return false, err
×
149
        }
150
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
151
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
152
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
153
                r.log.V(3).Info(msg)
1✔
154
                return false, nil
1✔
155
        }
1✔
156
        return true, nil
×
157
}
158

159
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
160
        if dataImportCron.Spec.Schedule == "" {
2✔
161
                return nil
1✔
162
        }
1✔
163
        if isImageStreamSource(dataImportCron) {
2✔
164
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
165
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
166
                }
1✔
167
                return nil
1✔
168
        }
169
        if !isURLSource(dataImportCron) {
1✔
170
                return nil
×
171
        }
×
172
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
173
        if exists || err != nil {
2✔
174
                return err
1✔
175
        }
1✔
176
        cronJob, err := r.newCronJob(dataImportCron)
1✔
177
        if err != nil {
1✔
178
                return err
×
179
        }
×
180
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
181
                return err
×
182
        }
×
183
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
184
        if err != nil {
1✔
185
                return err
×
186
        }
×
187
        if err := r.client.Create(ctx, job); err != nil {
1✔
188
                return err
×
189
        }
×
190
        return nil
1✔
191
}
192

193
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
194
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
195
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
196
        }
×
197
        imageStream := &imagev1.ImageStream{}
1✔
198
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
199
        if err != nil {
2✔
200
                return nil, "", err
1✔
201
        }
1✔
202
        imageStreamNamespacedName := types.NamespacedName{
1✔
203
                Namespace: imageStreamNamespace,
1✔
204
                Name:      name,
1✔
205
        }
1✔
206
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
207
                return nil, "", err
1✔
208
        }
1✔
209
        return imageStream, tag, nil
1✔
210
}
211

212
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
213
        if imageStream == nil {
1✔
214
                return "", "", errors.Errorf("No ImageStream")
×
215
        }
×
216
        tags := imageStream.Status.Tags
1✔
217
        if len(tags) == 0 {
1✔
218
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
219
        }
×
220

221
        tagIdx := 0
1✔
222
        if len(imageStreamTag) > 0 {
2✔
223
                tagIdx = -1
1✔
224
                for i, tag := range tags {
2✔
225
                        if tag.Tag == imageStreamTag {
2✔
226
                                tagIdx = i
1✔
227
                                break
1✔
228
                        }
229
                }
230
        }
231
        if tagIdx == -1 {
2✔
232
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
233
        }
1✔
234

235
        if len(tags[tagIdx].Items) == 0 {
2✔
236
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
237
        }
1✔
238
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
239
}
240

241
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
242
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
243
                return imageStreamName, "", nil
1✔
244
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
245
                return subs[0], subs[1], nil
1✔
246
        }
1✔
247
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
248
}
249

250
func (r *DataImportCronReconciler) pollImageStreamDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
251
        if nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]; nextTimeStr != "" {
2✔
252
                nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
253
                if err != nil {
1✔
254
                        return reconcile.Result{}, err
×
255
                }
×
256
                if nextTime.Before(time.Now()) {
2✔
257
                        if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
258
                                return reconcile.Result{}, err
1✔
259
                        }
1✔
260
                }
261
        }
262
        return r.setNextCronTime(dataImportCron)
1✔
263
}
264

265
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
266
        now := time.Now()
1✔
267
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
268
        if err != nil {
1✔
269
                return reconcile.Result{}, err
×
270
        }
×
271
        nextTime := expr.Next(now)
1✔
272
        requeueAfter := nextTime.Sub(now)
1✔
273
        res := reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}
1✔
274
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
275
        return res, err
1✔
276
}
277

278
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
279
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
280
        return err == nil && regSource.ImageStream != nil
1✔
281
}
1✔
282

283
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
284
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
285
        return err == nil && regSource.URL != nil
1✔
286
}
1✔
287

288
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
289
        source := cron.Spec.Template.Spec.Source
1✔
290
        if source == nil || source.Registry == nil {
1✔
291
                return nil, errors.Errorf("Cron with no registry source %s", cron.Name)
×
292
        }
×
293
        return source.Registry, nil
1✔
294
}
295

296
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
297
        res := reconcile.Result{}
1✔
298

1✔
299
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
300
        if err != nil {
1✔
301
                return res, err
×
302
        }
×
303

304
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
305
        imports := dataImportCron.Status.CurrentImports
1✔
306
        importSucceeded := false
1✔
307

1✔
308
        dataVolume := dataImportCron.Spec.Template
1✔
309
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
310
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
311
        if err != nil {
1✔
312
                return res, err
×
313
        }
×
314
        if desiredStorageClass != nil {
2✔
315
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
316
                        return res, err
1✔
317
                }
1✔
318
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
319
        }
320
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
321
        if err != nil {
1✔
322
                return res, err
×
323
        }
×
324
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
325
        if err != nil {
1✔
326
                return res, err
×
327
        }
×
328

329
        handlePopulatedPvc := func() error {
2✔
330
                if pvc != nil {
2✔
331
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
332
                                return err
×
333
                        }
×
334
                }
335
                importSucceeded = true
1✔
336
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
337
                        return err
×
338
                }
×
339

340
                return nil
1✔
341
        }
342

343
        switch {
1✔
344
        case dv != nil:
1✔
345
                switch dv.Status.Phase {
1✔
346
                case cdiv1.Succeeded:
1✔
347
                        if err := handlePopulatedPvc(); err != nil {
1✔
348
                                return res, err
×
349
                        }
×
350
                case cdiv1.ImportScheduled:
1✔
351
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
352
                case cdiv1.ImportInProgress:
1✔
353
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
354
                default:
1✔
355
                        dvPhase := string(dv.Status.Phase)
1✔
356
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
357
                }
358
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
359
                // TODO: with plain populator PVCs (no DataVolumes) we may need to wait for corev1.Bound
1✔
360
                if err := handlePopulatedPvc(); err != nil {
1✔
361
                        return res, err
×
362
                }
×
363
        case snapshot != nil:
1✔
364
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
365
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
366
                                return res, err
×
367
                        }
×
368
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
369
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
370
                }
371
                // Below k8s 1.29 there's no way to know the source volume mode
372
                // Let's at least expose this info on our own snapshots
373
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
374
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
375
                        if err != nil {
1✔
376
                                return res, err
×
377
                        }
×
378
                        if volMode != nil {
2✔
379
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
380
                        }
1✔
381
                }
382
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
383
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
384
                if err != nil {
1✔
385
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
×
386
                                return res, err
×
387
                        }
×
388
                } else {
1✔
389
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
390
                }
1✔
391
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
392
                        return res, err
×
393
                }
×
394
                importSucceeded = true
1✔
395
        default:
1✔
396
                if len(imports) > 0 {
2✔
397
                        imports = imports[1:]
1✔
398
                        dataImportCron.Status.CurrentImports = imports
1✔
399
                }
1✔
400
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
401
        }
402

403
        if importSucceeded {
2✔
404
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
405
                        return res, err
×
406
                }
×
407
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
408
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
409
                        return res, err
×
410
                }
×
411
        }
412

413
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
414
                return res, err
×
415
        }
×
416

417
        // Skip if schedule is disabled
418
        if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
419
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
420
                pollRes, err := r.pollImageStreamDigest(ctx, dataImportCron)
1✔
421
                if err != nil {
2✔
422
                        return pollRes, err
1✔
423
                }
1✔
424
                res = pollRes
1✔
425
        }
426

427
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
428
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
429
        if digestUpdated {
2✔
430
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
431
                if dv != nil {
1✔
432
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
433
                                return res, err
×
434
                        }
×
435
                }
436
                if importSucceeded || len(imports) == 0 {
2✔
437
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
438
                                return res, err
1✔
439
                        }
1✔
440
                }
441
        } else if importSucceeded {
2✔
442
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
443
                        return res, err
×
444
                }
×
445
        } else if len(imports) > 0 {
2✔
446
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
447
        } else {
2✔
448
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
449
        }
1✔
450

451
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
452
                return res, err
×
453
        }
×
454

455
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
456
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
457
                        return res, err
×
458
                }
×
459
        }
460
        return res, nil
1✔
461
}
462

463
// Returns the current import DV if exists, and the last imported PVC
464
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
465
        imports := cron.Status.CurrentImports
1✔
466
        if len(imports) == 0 {
2✔
467
                return nil, nil, nil
1✔
468
        }
1✔
469

470
        dvName := imports[0].DataVolumeName
1✔
471
        dv := &cdiv1.DataVolume{}
1✔
472
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
473
                if !k8serrors.IsNotFound(err) {
1✔
474
                        return nil, nil, err
×
475
                }
×
476
                dv = nil
1✔
477
        }
478

479
        pvc := &corev1.PersistentVolumeClaim{}
1✔
480
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
481
                if !k8serrors.IsNotFound(err) {
1✔
482
                        return nil, nil, err
×
483
                }
×
484
                pvc = nil
1✔
485
        }
486
        return dv, pvc, nil
1✔
487
}
488

489
// Returns the current import DV if exists, and the last imported PVC
490
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
491
        imports := cron.Status.CurrentImports
1✔
492
        if len(imports) == 0 {
2✔
493
                return nil, nil
1✔
494
        }
1✔
495

496
        snapName := imports[0].DataVolumeName
1✔
497
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
498
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
499
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
500
                        return nil, err
×
501
                }
×
502
                return nil, nil
1✔
503
        }
504

505
        return snapshot, nil
1✔
506
}
507

508
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
509
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
510
        dataSource := &cdiv1.DataSource{}
1✔
511
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
512
                return nil, err
1✔
513
        }
1✔
514
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
515
                log := r.log.WithName("getCronManagedDataSource")
×
516
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
517
                return nil, ErrNotManagedByCron
×
518
        }
×
519
        return dataSource, nil
1✔
520
}
521

522
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
523
        objCopy := obj.DeepCopyObject()
1✔
524
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
525
        r.setDataImportCronResourceLabels(cron, obj)
1✔
526
        if !reflect.DeepEqual(obj, objCopy) {
2✔
527
                if err := r.client.Update(ctx, obj); err != nil {
1✔
528
                        return err
×
529
                }
×
530
        }
531
        return nil
1✔
532
}
533

534
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
535
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
536
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
537
                if cond.Status == corev1.ConditionFalse && cond.Reason == common.GenericError {
×
538
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
539
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
540
                        dv.Labels[common.DataImportCronLabel] = ""
×
541
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
542
                                return err
×
543
                        }
×
544
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
545
                                return err
×
546
                        }
×
547
                        cron.Status.CurrentImports = nil
×
548
                }
549
        }
550
        return nil
×
551
}
552

553
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
554
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
555
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
556
        if err != nil {
1✔
557
                return err
×
558
        }
×
559
        if regSource.ImageStream == nil {
1✔
560
                return nil
×
561
        }
×
562
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
563
        if err != nil {
2✔
564
                return err
1✔
565
        }
1✔
566
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
567
        if err != nil {
2✔
568
                return err
1✔
569
        }
1✔
570
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
571
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
572
                log.Info("Updating DataImportCron", "digest", digest)
1✔
573
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
574
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
575
        }
1✔
576
        return nil
1✔
577
}
578

579
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
580
        log := r.log.WithName("updateDataSource")
1✔
581
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
582
        if err != nil {
2✔
583
                if k8serrors.IsNotFound(err) {
2✔
584
                        dataSource = r.newDataSource(dataImportCron)
1✔
585
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
586
                                return err
×
587
                        }
×
588
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
589
                } else if errors.Is(err, ErrNotManagedByCron) {
×
590
                        return nil
×
591
                } else {
×
592
                        return err
×
593
                }
×
594
        }
595
        dataSourceCopy := dataSource.DeepCopy()
1✔
596
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
597

1✔
598
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
599
        populateDataSource(format, dataSource, sourcePVC)
1✔
600

1✔
601
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
602
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
603
                        return err
×
604
                }
×
605
        }
606

607
        return nil
1✔
608
}
609

610
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
611
        if sourcePVC == nil {
2✔
612
                return
1✔
613
        }
1✔
614

615
        switch format {
1✔
616
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
617
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
618
                        PVC: sourcePVC,
1✔
619
                }
1✔
620
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
621
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
622
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
623
                                Namespace: sourcePVC.Namespace,
1✔
624
                                Name:      sourcePVC.Name,
1✔
625
                        },
1✔
626
                }
1✔
627
        }
628
}
629

630
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
631
        if dataImportCron.Status.CurrentImports == nil {
1✔
632
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
633
        }
×
634
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
635
                Namespace: dataImportCron.Namespace,
1✔
636
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
637
        }
1✔
638
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
639
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
640
                now := metav1.Now()
1✔
641
                dataImportCron.Status.LastImportTimestamp = &now
1✔
642
        }
1✔
643
        return nil
1✔
644
}
645

646
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
647
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
648
        if lastTimeStr == "" {
2✔
649
                return nil
1✔
650
        }
1✔
651
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
652
        if err != nil {
1✔
653
                return err
×
654
        }
×
655
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
656
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
657
        }
1✔
658
        return nil
1✔
659
}
660

661
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
662
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
663
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
664
        if digest == "" {
1✔
665
                return nil
×
666
        }
×
667
        dvName, err := createDvName(dataSourceName, digest)
1✔
668
        if err != nil {
2✔
669
                return err
1✔
670
        }
1✔
671
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
672

1✔
673
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
674
        for _, src := range sources {
2✔
675
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
676
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
677
                                return err
×
678
                        }
×
679
                } else {
1✔
680
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
681
                                return err
×
682
                        }
×
683
                        // If source exists don't create DV
684
                        return nil
1✔
685
                }
686
        }
687

688
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
689
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
690
                return err
×
691
        }
×
692

693
        return nil
1✔
694
}
695

696
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
697
        switch format {
1✔
698
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
699
                return nil
1✔
700
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
701
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
702
        default:
×
703
                return fmt.Errorf("unknown source format for snapshot")
×
704
        }
705
}
706

707
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
708
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
709
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
710
                return nil
1✔
711
        }
1✔
712
        storageProfile := &cdiv1.StorageProfile{}
1✔
713
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
714
                return err
×
715
        }
×
716
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
717
        if err != nil {
1✔
718
                return err
×
719
        }
×
720
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
721
                ObjectMeta: metav1.ObjectMeta{
1✔
722
                        Name:      pvc.Name,
1✔
723
                        Namespace: dataImportCron.Namespace,
1✔
724
                        Labels: map[string]string{
1✔
725
                                common.CDILabelKey:       common.CDILabelValue,
1✔
726
                                common.CDIComponentLabel: "",
1✔
727
                        },
1✔
728
                },
1✔
729
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
730
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
731
                                PersistentVolumeClaimName: &pvc.Name,
1✔
732
                        },
1✔
733
                        VolumeSnapshotClassName: &className,
1✔
734
                },
1✔
735
        }
1✔
736
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
737
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
738

1✔
739
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
740
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
741
                if !k8serrors.IsNotFound(err) {
1✔
742
                        return err
×
743
                }
×
744
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
745
                if pvc.Spec.VolumeMode != nil {
2✔
746
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
747
                }
1✔
748
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
749
                        return err
×
750
                }
×
751
        } else {
1✔
752
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
753
                        // Clean up DV/PVC as they are not needed anymore
1✔
754
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
755
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
756
                                return err
×
757
                        }
×
758
                }
759
        }
760

761
        return nil
1✔
762
}
763

764
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
765
        dataImportCron.Status.SourceFormat = &format
1✔
766

1✔
767
        switch format {
1✔
768
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
769
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
770
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
771
                if snapshot == nil {
2✔
772
                        // Snapshot create/update will trigger reconcile
1✔
773
                        return nil
1✔
774
                }
1✔
775
                if cc.IsSnapshotReady(snapshot) {
2✔
776
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
777
                } else {
2✔
778
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
779
                }
1✔
780
        default:
×
781
                return fmt.Errorf("unknown source format for snapshot")
×
782
        }
783

784
        return nil
1✔
785
}
786

787
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
788
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
789
        if desiredStorageClass == nil {
2✔
790
                return format, nil
1✔
791
        }
1✔
792

793
        storageProfile := &cdiv1.StorageProfile{}
1✔
794
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
795
                return format, err
×
796
        }
×
797
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
798
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
799
        }
1✔
800

801
        return format, nil
1✔
802
}
803

804
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
805
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
806
                return nil
×
807
        }
×
808
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
809
        if err != nil {
1✔
810
                return err
×
811
        }
×
812

813
        maxImports := defaultImportsToKeepPerCron
1✔
814

1✔
815
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
816
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
817
        }
1✔
818

819
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
820
                return err
×
821
        }
×
822
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
823
                return err
×
824
        }
×
825

826
        return nil
1✔
827
}
828

829
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
830
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
831

1✔
832
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
833
                return err
×
834
        }
×
835
        if len(pvcList.Items) > maxImports {
2✔
836
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
837
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
838
                })
1✔
839
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
840
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
841
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
842
                                return err
×
843
                        }
×
844
                }
845
        }
846

847
        dvList := &cdiv1.DataVolumeList{}
1✔
848
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
849
                return err
×
850
        }
×
851

852
        if len(dvList.Items) > maxImports {
2✔
853
                for _, dv := range dvList.Items {
2✔
854
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
855
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
856
                                return err
×
857
                        }
×
858

859
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
860
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
861
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
862
                                        return err
×
863
                                }
×
864
                        }
865
                }
866
        }
867

868
        return nil
1✔
869
}
870

871
// deleteDvPvc deletes DV or PVC if DV was GCed
872
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
873
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
874
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
875
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
876
                return err
1✔
877
        }
1✔
878
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
879
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
880
                return err
×
881
        }
×
882
        return nil
1✔
883
}
884

885
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
886
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
887

1✔
888
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
889
                if meta.IsNoMatchError(err) {
×
890
                        return nil
×
891
                }
×
892
                return err
×
893
        }
894
        if len(snapList.Items) > maxImports {
1✔
895
                sort.Slice(snapList.Items, func(i, j int) bool {
×
896
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
897
                })
×
898
                for _, snap := range snapList.Items[maxImports:] {
×
899
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
900
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
901
                                return err
×
902
                        }
×
903
                }
904
        }
905

906
        return nil
1✔
907
}
908

909
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
910
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
911
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
912

1✔
913
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
914
                return err
×
915
        }
×
916
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
917
        if err != nil {
1✔
918
                return err
×
919
        }
×
920
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
921
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
922
                return err
×
923
        }
×
924
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
925
                return err
×
926
        }
×
927
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
928
                return err
×
929
        }
×
930
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
931
                return err
×
932
        }
×
933
        return nil
1✔
934
}
935

936
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
937
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
938
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
939
        if err != nil {
1✔
940
                return err
×
941
        }
×
942
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
943
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
944
                return err
×
945
        }
×
946
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
947
                return err
×
948
        }
×
949

950
        return nil
1✔
951
}
952

953
// NewDataImportCronController creates a new instance of the DataImportCron controller
954
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
955
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
956
                Scheme: mgr.GetScheme(),
×
957
                Mapper: mgr.GetRESTMapper(),
×
958
        })
×
959
        if err != nil {
×
960
                return nil, err
×
961
        }
×
962
        reconciler := &DataImportCronReconciler{
×
963
                client:          mgr.GetClient(),
×
964
                uncachedClient:  uncachedClient,
×
965
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
966
                scheme:          mgr.GetScheme(),
×
967
                log:             log.WithName(dataImportControllerName),
×
968
                image:           importerImage,
×
969
                pullPolicy:      pullPolicy,
×
970
                cdiNamespace:    util.GetNamespace(),
×
971
                installerLabels: installerLabels,
×
972
        }
×
973
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
974
                MaxConcurrentReconciles: 3,
×
975
                Reconciler:              reconciler,
×
976
        })
×
977
        if err != nil {
×
978
                return nil, err
×
979
        }
×
980
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
981
                return nil, err
×
982
        }
×
983
        log.Info("Initialized DataImportCron controller")
×
984
        return dataImportCronController, nil
×
985
}
986

987
func getCronName(obj client.Object) string {
×
988
        return obj.GetLabels()[common.DataImportCronLabel]
×
989
}
×
990

991
func getCronNs(obj client.Object) string {
×
992
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
993
}
×
994

995
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
996
        if cronName := getCronName(obj); cronName != "" {
×
997
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
998
        }
×
999
        return nil
×
1000
}
1001

1002
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1003
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1004
                return err
×
1005
        }
×
1006

1007
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1008
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1009
                // Otherwise we risk losing the storage profile event
×
1010
                var crons cdiv1.DataImportCronList
×
1011
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1012
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1013
                        return nil
×
1014
                }
×
1015
                // Storage profiles are 1:1 to storage classes
1016
                scName := obj.GetName()
×
1017
                var reqs []reconcile.Request
×
1018
                for _, cron := range crons.Items {
×
1019
                        dataVolume := cron.Spec.Template
×
1020
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1021
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1022
                        if err != nil || templateSc == nil {
×
1023
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1024
                                return reqs
×
1025
                        }
×
1026
                        if templateSc.Name == scName {
×
1027
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1028
                        }
×
1029
                }
1030
                return reqs
×
1031
        }
1032

1033
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1034
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1035
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1036
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1037
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1038
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1039
                },
1040
        )); err != nil {
×
1041
                return err
×
1042
        }
×
1043

1044
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1045
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1046
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1047
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1048
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1049
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1050
                },
1051
        )); err != nil {
×
1052
                return err
×
1053
        }
×
1054

1055
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1056
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1057
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1058
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1059
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1060
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1061
                },
1062
        )); err != nil {
×
1063
                return err
×
1064
        }
×
1065

1066
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1067
                return err
×
1068
        }
×
1069

1070
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1071
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1072
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1073
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1074
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1075
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1076
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1077
                        },
×
1078
                },
1079
        )); err != nil {
×
1080
                return err
×
1081
        }
×
1082

1083
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1084
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1085
        }
×
1086

1087
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1088
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1089
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1090
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1091
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1092
                        },
×
1093
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1094
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1095
                },
1096
        )); err != nil {
×
1097
                return err
×
1098
        }
×
1099

1100
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1101
                if meta.IsNoMatchError(err) {
×
1102
                        // Back out if there's no point to attempt watch
×
1103
                        return nil
×
1104
                }
×
1105
                if !cc.IsErrCacheNotStarted(err) {
×
1106
                        return err
×
1107
                }
×
1108
        }
1109
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1110
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1111
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1112
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1113
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1114
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1115
                },
1116
        )); err != nil {
×
1117
                return err
×
1118
        }
×
1119

1120
        return nil
×
1121
}
1122

1123
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1124
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1125
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1126
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1127
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1128
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1129
                                log.Info("Update", "sc", obj.GetName(),
×
1130
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1131
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1132
                                reqs, err := getReconcileRequestsForDicsWithPendingPvc(ctx, mgr.GetClient())
×
1133
                                if err != nil {
×
1134
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1135
                                }
×
1136
                                return reqs
×
1137
                        },
1138
                ),
1139
                predicate.TypedFuncs[*storagev1.StorageClass]{
1140
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1141
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1142
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1143
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1144
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1145
                        },
×
1146
                },
1147
        )); err != nil {
×
1148
                return err
×
1149
        }
×
1150

1151
        return nil
×
1152
}
1153

1154
func getReconcileRequestsForDicsWithPendingPvc(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1155
        dicList := &cdiv1.DataImportCronList{}
×
1156
        if err := c.List(ctx, dicList); err != nil {
×
1157
                return nil, err
×
1158
        }
×
1159
        reqs := []reconcile.Request{}
×
1160
        for _, dic := range dicList.Items {
×
1161
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1162
                        continue
×
1163
                }
1164

1165
                imports := dic.Status.CurrentImports
×
1166
                if len(imports) == 0 {
×
1167
                        continue
×
1168
                }
1169

1170
                pvcName := imports[0].DataVolumeName
×
1171
                pvc := &corev1.PersistentVolumeClaim{}
×
1172
                if err := c.Get(ctx, types.NamespacedName{Namespace: dic.Namespace, Name: pvcName}, pvc); err != nil {
×
1173
                        if k8serrors.IsNotFound(err) {
×
1174
                                continue
×
1175
                        }
1176
                        return nil, err
×
1177
                }
1178

1179
                if pvc.Status.Phase == corev1.ClaimPending {
×
1180
                        reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1181
                }
×
1182
        }
1183

1184
        return reqs, nil
×
1185
}
1186

1187
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1188
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1189
                return false, nil
1✔
1190
        }
1✔
1191

1192
        sc := pvc.Spec.StorageClassName
1✔
1193
        if sc == nil || *sc == desiredStorageClass {
2✔
1194
                return false, nil
1✔
1195
        }
1✔
1196

1197
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1198
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1199
                return false, err
×
1200
        }
×
1201

1202
        return true, nil
1✔
1203
}
1204

1205
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1206
        cronJob := &batchv1.CronJob{}
1✔
1207
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1208
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1209
                return false, cc.IgnoreNotFound(err)
1✔
1210
        }
1✔
1211

1212
        cronJobCopy := cronJob.DeepCopy()
1✔
1213
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1214
                return false, err
×
1215
        }
×
1216

1217
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1218
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1219
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1220
                        return false, cc.IgnoreNotFound(err)
×
1221
                }
×
1222
        }
1223
        return true, nil
1✔
1224
}
1225

1226
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1227
        cronJob := &batchv1.CronJob{
1✔
1228
                ObjectMeta: metav1.ObjectMeta{
1✔
1229
                        Name:      GetCronJobName(cron),
1✔
1230
                        Namespace: r.cdiNamespace,
1✔
1231
                },
1✔
1232
        }
1✔
1233
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1234
                return nil, err
×
1235
        }
×
1236
        return cronJob, nil
1✔
1237
}
1238

1239
// InitPollerPodSpec inits poller PodSpec
1240
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1241
        regSource, err := getCronRegistrySource(cron)
1✔
1242
        if err != nil {
1✔
1243
                return err
×
1244
        }
×
1245
        if regSource.URL == nil {
1✔
1246
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1247
        }
×
1248
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1249
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1250
                return err
×
1251
        }
×
1252
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1253
        if err != nil {
1✔
1254
                return err
×
1255
        }
×
1256
        container := corev1.Container{
1✔
1257
                Name:  "cdi-source-update-poller",
1✔
1258
                Image: image,
1✔
1259
                Command: []string{
1✔
1260
                        "/usr/bin/cdi-source-update-poller",
1✔
1261
                        "-ns", cron.Namespace,
1✔
1262
                        "-cron", cron.Name,
1✔
1263
                        "-url", *regSource.URL,
1✔
1264
                },
1✔
1265
                ImagePullPolicy:          pullPolicy,
1✔
1266
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1267
                TerminationMessagePolicy: corev1.TerminationMessageReadFile,
1✔
1268
        }
1✔
1269

1✔
1270
        var volumes []corev1.Volume
1✔
1271
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1272
        if hasCertConfigMap {
1✔
1273
                vm := corev1.VolumeMount{
×
1274
                        Name:      CertVolName,
×
1275
                        MountPath: common.ImporterCertDir,
×
1276
                }
×
1277
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1278
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1279
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1280
        }
×
1281

1282
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1283
                vm := corev1.VolumeMount{
1✔
1284
                        Name:      ProxyCertVolName,
1✔
1285
                        MountPath: common.ImporterProxyCertDir,
1✔
1286
                }
1✔
1287
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1288
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1289
        }
1✔
1290

1291
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1292
                container.Env = append(container.Env,
×
1293
                        corev1.EnvVar{
×
1294
                                Name: common.ImporterAccessKeyID,
×
1295
                                ValueFrom: &corev1.EnvVarSource{
×
1296
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1297
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1298
                                                        Name: *regSource.SecretRef,
×
1299
                                                },
×
1300
                                                Key: common.KeyAccess,
×
1301
                                        },
×
1302
                                },
×
1303
                        },
×
1304
                        corev1.EnvVar{
×
1305
                                Name: common.ImporterSecretKey,
×
1306
                                ValueFrom: &corev1.EnvVarSource{
×
1307
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1308
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1309
                                                        Name: *regSource.SecretRef,
×
1310
                                                },
×
1311
                                                Key: common.KeySecret,
×
1312
                                        },
×
1313
                                },
×
1314
                        },
×
1315
                )
×
1316
        }
×
1317

1318
        addEnvVar := func(varName, value string) {
2✔
1319
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1320
        }
1✔
1321

1322
        if insecureTLS {
1✔
1323
                addEnvVar(common.InsecureTLSVar, "true")
×
1324
        }
×
1325

1326
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1327
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1328
                        addEnvVar(varName, value)
1✔
1329
                }
1✔
1330
        }
1331

1332
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1333
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1334
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1335

1✔
1336
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1337
        if err != nil {
1✔
1338
                return err
×
1339
        }
×
1340
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1341
        if err != nil {
1✔
1342
                return err
×
1343
        }
×
1344

1345
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1346
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1347
        podSpec.Containers = []corev1.Container{container}
1✔
1348
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1349
        podSpec.Volumes = volumes
1✔
1350
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1351
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1352
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1353
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1354

1✔
1355
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1356

1✔
1357
        return nil
1✔
1358
}
1359

1360
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1361
        cronJobSpec := &cronJob.Spec
1✔
1362
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1363
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1364
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1365
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1366

1✔
1367
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1368
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1369
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1370

1✔
1371
        podSpec := &jobSpec.Template.Spec
1✔
1372
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1373
                return err
×
1374
        }
×
1375
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1376
                return err
×
1377
        }
×
1378
        return nil
1✔
1379
}
1380

1381
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1382
        job := &batchv1.Job{
1✔
1383
                ObjectMeta: metav1.ObjectMeta{
1✔
1384
                        Name:      GetInitialJobName(cron),
1✔
1385
                        Namespace: cronJob.Namespace,
1✔
1386
                },
1✔
1387
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1388
        }
1✔
1389
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1390
                return nil, err
×
1391
        }
×
1392
        return job, nil
1✔
1393
}
1394

1395
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1396
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1397
                return err
×
1398
        }
×
1399
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1400
        labels := obj.GetLabels()
1✔
1401
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1402
        labels[common.DataImportCronLabel] = cron.Name
1✔
1403
        obj.SetLabels(labels)
1✔
1404
        return nil
1✔
1405
}
1406

1407
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1408
        var digestedURL string
1✔
1409
        dv := cron.Spec.Template.DeepCopy()
1✔
1410
        if isURLSource(cron) {
2✔
1411
                digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1412
        } else if isImageStreamSource(cron) {
3✔
1413
                // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1414
                digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1415
                dv.Spec.Source.Registry.ImageStream = nil
1✔
1416
        }
1✔
1417
        dv.Spec.Source.Registry.URL = &digestedURL
1✔
1418
        dv.Name = dataVolumeName
1✔
1419
        dv.Namespace = cron.Namespace
1✔
1420
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1421
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1422
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1423
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1424

1✔
1425
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1426
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1427
        }
1✔
1428

1429
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1430

1✔
1431
        return dv
1✔
1432
}
1433

1434
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1435
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1436
        labels := obj.GetLabels()
1✔
1437
        labels[common.DataImportCronLabel] = cron.Name
1✔
1438
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1439
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1440
        }
1✔
1441
        obj.SetLabels(labels)
1✔
1442
}
1443

1444
func untagDigestedDockerURL(dockerURL string) string {
1✔
1445
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1446
                url := u.Host + u.Path
1✔
1447
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1448
                // Check for tag
1✔
1449
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1450
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1451
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1452
                        }
1✔
1453
                }
1454
        }
1455
        return dockerURL
1✔
1456
}
1457

1458
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1459
        if val := cron.Labels[ann]; val != "" {
2✔
1460
                cc.AddLabel(dv, ann, val)
1✔
1461
        }
1✔
1462
}
1463

1464
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1465
        if val := cron.Annotations[ann]; val != "" {
1✔
1466
                cc.AddAnnotation(dv, ann, val)
×
1467
        }
×
1468
}
1469

1470
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1471
        dataSource := &cdiv1.DataSource{
1✔
1472
                ObjectMeta: metav1.ObjectMeta{
1✔
1473
                        Name:      cron.Spec.ManagedDataSource,
1✔
1474
                        Namespace: cron.Namespace,
1✔
1475
                },
1✔
1476
        }
1✔
1477
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1478
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1479
        return dataSource
1✔
1480
}
1✔
1481

1482
// Create DataVolume name based on the DataSource name + prefix of the digest sha256
1483
func createDvName(prefix, digest string) (string, error) {
1✔
1484
        fromIdx := len(digestPrefix)
1✔
1485
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1486
        if !strings.HasPrefix(digest, digestPrefix) {
2✔
1487
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1488
        }
1✔
1489
        if len(digest) < toIdx {
2✔
1490
                return "", errors.Errorf("Digest is too short")
1✔
1491
        }
1✔
1492
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1493
}
1494

1495
// GetCronJobName get CronJob name based on cron name and UID
1496
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1497
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1498
}
1✔
1499

1500
// GetInitialJobName get initial job name based on cron name and UID
1501
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1502
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1503
}
1✔
1504

1505
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1506
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1507
}
1✔
1508

1509
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1510
        dv := &cron.Spec.Template
1✔
1511

1✔
1512
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1513
                return explicitVolumeMode, nil
×
1514
        }
×
1515

1516
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1517
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1518
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1519
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1520
                        AccessModes:      accessModes,
1✔
1521
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1522
                        Resources: corev1.VolumeResourceRequirements{
1✔
1523
                                Requests: corev1.ResourceList{
1✔
1524
                                        // Doesn't matter
1✔
1525
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1526
                                },
1✔
1527
                        },
1✔
1528
                },
1✔
1529
        }
1✔
1530
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1531
                return nil, err
×
1532
        }
×
1533

1534
        return inferredPvc.Spec.VolumeMode, nil
1✔
1535
}
1536

1537
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1538
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1539
        if dv.Spec.PVC != nil {
1✔
1540
                return dv.Spec.PVC.VolumeMode
×
1541
        }
×
1542

1543
        if dv.Spec.Storage != nil {
2✔
1544
                return dv.Spec.Storage.VolumeMode
1✔
1545
        }
1✔
1546

1547
        return nil
×
1548
}
1549

1550
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1551
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1552
        if dv.Spec.PVC != nil {
1✔
1553
                return dv.Spec.PVC.AccessModes
×
1554
        }
×
1555

1556
        if dv.Spec.Storage != nil {
2✔
1557
                return dv.Spec.Storage.AccessModes
1✔
1558
        }
1✔
1559

1560
        return nil
×
1561
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc