• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #4940

03 Sep 2024 08:10PM UTC coverage: 59.171% (+0.004%) from 59.167%
#4940

push

travis-ci

web-flow
Bump builder to latest (#3419)

We've had a few PRs merge to the builder dockerfile,
let's start using the new one.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

16640 of 28122 relevant lines covered (59.17%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.68
/pkg/controller/dataimportcron-controller.go
1
/*
2
Copyright 2021 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
limitations under the License.
14
See the License for the specific language governing permissions and
15
*/
16

17
package controller
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "reflect"
24
        "sort"
25
        "strings"
26
        "time"
27

28
        "github.com/containers/image/v5/docker/reference"
29
        "github.com/go-logr/logr"
30
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
31
        imagev1 "github.com/openshift/api/image/v1"
32
        "github.com/pkg/errors"
33
        cronexpr "github.com/robfig/cron/v3"
34

35
        batchv1 "k8s.io/api/batch/v1"
36
        corev1 "k8s.io/api/core/v1"
37
        storagev1 "k8s.io/api/storage/v1"
38
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
39
        "k8s.io/apimachinery/pkg/api/meta"
40
        "k8s.io/apimachinery/pkg/api/resource"
41
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
        "k8s.io/apimachinery/pkg/labels"
43
        "k8s.io/apimachinery/pkg/runtime"
44
        "k8s.io/apimachinery/pkg/types"
45
        "k8s.io/client-go/tools/record"
46
        openapicommon "k8s.io/kube-openapi/pkg/common"
47
        "k8s.io/utils/ptr"
48

49
        "sigs.k8s.io/controller-runtime/pkg/client"
50
        "sigs.k8s.io/controller-runtime/pkg/controller"
51
        "sigs.k8s.io/controller-runtime/pkg/event"
52
        "sigs.k8s.io/controller-runtime/pkg/handler"
53
        "sigs.k8s.io/controller-runtime/pkg/manager"
54
        "sigs.k8s.io/controller-runtime/pkg/predicate"
55
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
56
        "sigs.k8s.io/controller-runtime/pkg/source"
57

58
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
59
        "kubevirt.io/containerized-data-importer/pkg/common"
60
        cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
61
        dvc "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
62
        metrics "kubevirt.io/containerized-data-importer/pkg/monitoring/metrics/cdi-controller"
63
        "kubevirt.io/containerized-data-importer/pkg/operator"
64
        "kubevirt.io/containerized-data-importer/pkg/util"
65
        "kubevirt.io/containerized-data-importer/pkg/util/naming"
66
)
67

68
const (
69
        // ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
70
        ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
71
        // MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
72
        MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
73
)
74

75
// DataImportCronReconciler members
76
type DataImportCronReconciler struct {
77
        client          client.Client
78
        uncachedClient  client.Client
79
        recorder        record.EventRecorder
80
        scheme          *runtime.Scheme
81
        log             logr.Logger
82
        image           string
83
        pullPolicy      string
84
        cdiNamespace    string
85
        installerLabels map[string]string
86
}
87

88
const (
89
        // AnnSourceDesiredDigest is the digest of the pending updated image
90
        AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
91
        // AnnImageStreamDockerRef is the ImageStream Docker reference
92
        AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
93
        // AnnNextCronTime is the next time stamp which satisfies the cron expression
94
        AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
95
        // AnnLastCronTime is the cron last execution time stamp
96
        AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
97
        // AnnLastUseTime is the PVC last use time stamp
98
        AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
99
        // AnnStorageClass is the cron DV's storage class
100
        AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
101

102
        dataImportControllerName    = "dataimportcron-controller"
103
        digestPrefix                = "sha256:"
104
        digestDvNameSuffixLength    = 12
105
        cronJobUIDSuffixLength      = 8
106
        defaultImportsToKeepPerCron = 3
107
)
108

109
var ErrNotManagedByCron = errors.New("DataSource is not managed by this DataImportCron")
110

111
// Reconcile loop for DataImportCronReconciler
112
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
1✔
113
        dataImportCron := &cdiv1.DataImportCron{}
1✔
114
        if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
2✔
115
                return reconcile.Result{}, err
1✔
116
        } else if err != nil || dataImportCron.DeletionTimestamp != nil {
3✔
117
                err := r.cleanup(ctx, req.NamespacedName)
1✔
118
                return reconcile.Result{}, err
1✔
119
        }
1✔
120
        shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
1✔
121
        if !shouldReconcile || err != nil {
1✔
122
                return reconcile.Result{}, err
×
123
        }
×
124

125
        if err := r.initCron(ctx, dataImportCron); err != nil {
1✔
126
                return reconcile.Result{}, err
×
127
        }
×
128

129
        return r.update(ctx, dataImportCron)
1✔
130
}
131

132
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
133
        dataSource := &cdiv1.DataSource{}
1✔
134
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
2✔
135
                if k8serrors.IsNotFound(err) {
2✔
136
                        return true, nil
1✔
137
                }
1✔
138
                return false, err
×
139
        }
140
        dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
1✔
141
        if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
2✔
142
                return true, nil
1✔
143
        }
1✔
144
        otherCron := &cdiv1.DataImportCron{}
1✔
145
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
2✔
146
                if k8serrors.IsNotFound(err) {
2✔
147
                        return true, nil
1✔
148
                }
1✔
149
                return false, err
×
150
        }
151
        if otherCron.Spec.ManagedDataSource == dataSource.Name {
2✔
152
                msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
1✔
153
                r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
1✔
154
                r.log.V(3).Info(msg)
1✔
155
                return false, nil
1✔
156
        }
1✔
157
        return true, nil
×
158
}
159

160
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
161
        if dataImportCron.Spec.Schedule == "" {
2✔
162
                return nil
1✔
163
        }
1✔
164
        if isImageStreamSource(dataImportCron) {
2✔
165
                if dataImportCron.Annotations[AnnNextCronTime] == "" {
2✔
166
                        cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
1✔
167
                }
1✔
168
                return nil
1✔
169
        }
170
        if !isURLSource(dataImportCron) {
1✔
171
                return nil
×
172
        }
×
173
        exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
1✔
174
        if exists || err != nil {
2✔
175
                return err
1✔
176
        }
1✔
177
        cronJob, err := r.newCronJob(dataImportCron)
1✔
178
        if err != nil {
1✔
179
                return err
×
180
        }
×
181
        if err := r.client.Create(ctx, cronJob); err != nil {
1✔
182
                return err
×
183
        }
×
184
        job, err := r.newInitialJob(dataImportCron, cronJob)
1✔
185
        if err != nil {
1✔
186
                return err
×
187
        }
×
188
        if err := r.client.Create(ctx, job); err != nil {
1✔
189
                return err
×
190
        }
×
191
        return nil
1✔
192
}
193

194
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
1✔
195
        if imageStreamName == "" || imageStreamNamespace == "" {
1✔
196
                return nil, "", errors.Errorf("Missing ImageStream name or namespace")
×
197
        }
×
198
        imageStream := &imagev1.ImageStream{}
1✔
199
        name, tag, err := splitImageStreamName(imageStreamName)
1✔
200
        if err != nil {
2✔
201
                return nil, "", err
1✔
202
        }
1✔
203
        imageStreamNamespacedName := types.NamespacedName{
1✔
204
                Namespace: imageStreamNamespace,
1✔
205
                Name:      name,
1✔
206
        }
1✔
207
        if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
2✔
208
                return nil, "", err
1✔
209
        }
1✔
210
        return imageStream, tag, nil
1✔
211
}
212

213
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
1✔
214
        if imageStream == nil {
1✔
215
                return "", "", errors.Errorf("No ImageStream")
×
216
        }
×
217
        tags := imageStream.Status.Tags
1✔
218
        if len(tags) == 0 {
1✔
219
                return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
×
220
        }
×
221

222
        tagIdx := 0
1✔
223
        if len(imageStreamTag) > 0 {
2✔
224
                tagIdx = -1
1✔
225
                for i, tag := range tags {
2✔
226
                        if tag.Tag == imageStreamTag {
2✔
227
                                tagIdx = i
1✔
228
                                break
1✔
229
                        }
230
                }
231
        }
232
        if tagIdx == -1 {
2✔
233
                return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
1✔
234
        }
1✔
235

236
        if len(tags[tagIdx].Items) == 0 {
2✔
237
                return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
1✔
238
        }
1✔
239
        return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
1✔
240
}
241

242
func splitImageStreamName(imageStreamName string) (string, string, error) {
1✔
243
        if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
2✔
244
                return imageStreamName, "", nil
1✔
245
        } else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
3✔
246
                return subs[0], subs[1], nil
1✔
247
        }
1✔
248
        return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
1✔
249
}
250

251
func (r *DataImportCronReconciler) pollImageStreamDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
252
        if nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]; nextTimeStr != "" {
2✔
253
                nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
1✔
254
                if err != nil {
1✔
255
                        return reconcile.Result{}, err
×
256
                }
×
257
                if nextTime.Before(time.Now()) {
2✔
258
                        if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
2✔
259
                                return reconcile.Result{}, err
1✔
260
                        }
1✔
261
                }
262
        }
263
        return r.setNextCronTime(dataImportCron)
1✔
264
}
265

266
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
267
        now := time.Now()
1✔
268
        expr, err := cronexpr.ParseStandard(dataImportCron.Spec.Schedule)
1✔
269
        if err != nil {
1✔
270
                return reconcile.Result{}, err
×
271
        }
×
272
        nextTime := expr.Next(now)
1✔
273
        requeueAfter := nextTime.Sub(now)
1✔
274
        res := reconcile.Result{Requeue: true, RequeueAfter: requeueAfter}
1✔
275
        cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
1✔
276
        return res, err
1✔
277
}
278

279
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
280
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
281
        return err == nil && regSource.ImageStream != nil
1✔
282
}
1✔
283

284
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
1✔
285
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
286
        return err == nil && regSource.URL != nil
1✔
287
}
1✔
288

289
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
1✔
290
        source := cron.Spec.Template.Spec.Source
1✔
291
        if source == nil || source.Registry == nil {
1✔
292
                return nil, errors.Errorf("Cron with no registry source %s", cron.Name)
×
293
        }
×
294
        return source.Registry, nil
1✔
295
}
296

297
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
1✔
298
        res := reconcile.Result{}
1✔
299

1✔
300
        dv, pvc, err := r.getImportState(ctx, dataImportCron)
1✔
301
        if err != nil {
1✔
302
                return res, err
×
303
        }
×
304

305
        dataImportCronCopy := dataImportCron.DeepCopy()
1✔
306
        imports := dataImportCron.Status.CurrentImports
1✔
307
        importSucceeded := false
1✔
308

1✔
309
        dataVolume := dataImportCron.Spec.Template
1✔
310
        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
1✔
311
        desiredStorageClass, err := cc.GetStorageClassByNameWithVirtFallback(ctx, r.client, explicitScName, dataVolume.Spec.ContentType)
1✔
312
        if err != nil {
1✔
313
                return res, err
×
314
        }
×
315
        if desiredStorageClass != nil {
2✔
316
                if deleted, err := r.deleteOutdatedPendingPvc(ctx, pvc, desiredStorageClass.Name, dataImportCron.Name); deleted || err != nil {
2✔
317
                        return res, err
1✔
318
                }
1✔
319
                currentSc, hasCurrent := dataImportCron.Annotations[AnnStorageClass]
1✔
320
                desiredSc := desiredStorageClass.Name
1✔
321
                if hasCurrent && currentSc != desiredSc {
2✔
322
                        r.log.Info("Storage class changed, delete most recent source on the old sc as it's no longer the desired", "currentSc", currentSc, "desiredSc", desiredSc)
1✔
323
                        if err := r.handleStorageClassChange(ctx, dataImportCron, desiredSc); err != nil {
1✔
324
                                return res, err
×
325
                        }
×
326
                        return reconcile.Result{RequeueAfter: time.Second}, nil
1✔
327
                }
328
                cc.AddAnnotation(dataImportCron, AnnStorageClass, desiredStorageClass.Name)
1✔
329
        }
330
        format, err := r.getSourceFormat(ctx, desiredStorageClass)
1✔
331
        if err != nil {
1✔
332
                return res, err
×
333
        }
×
334
        snapshot, err := r.getSnapshot(ctx, dataImportCron)
1✔
335
        if err != nil {
1✔
336
                return res, err
×
337
        }
×
338

339
        handlePopulatedPvc := func() error {
2✔
340
                if pvc != nil {
2✔
341
                        if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
1✔
342
                                return err
×
343
                        }
×
344
                }
345
                importSucceeded = true
1✔
346
                if err := r.handleCronFormat(ctx, dataImportCron, pvc, format, desiredStorageClass); err != nil {
1✔
347
                        return err
×
348
                }
×
349

350
                return nil
1✔
351
        }
352

353
        switch {
1✔
354
        case dv != nil:
1✔
355
                switch dv.Status.Phase {
1✔
356
                case cdiv1.Succeeded:
1✔
357
                        if err := handlePopulatedPvc(); err != nil {
1✔
358
                                return res, err
×
359
                        }
×
360
                case cdiv1.ImportScheduled:
1✔
361
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
1✔
362
                case cdiv1.ImportInProgress:
1✔
363
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
1✔
364
                default:
1✔
365
                        dvPhase := string(dv.Status.Phase)
1✔
366
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
1✔
367
                }
368
        case pvc != nil && pvc.Status.Phase == corev1.ClaimBound:
1✔
369
                if err := handlePopulatedPvc(); err != nil {
1✔
370
                        return res, err
×
371
                }
×
372
        case snapshot != nil:
1✔
373
                if format == cdiv1.DataImportCronSourceFormatPvc {
1✔
374
                        if err := r.client.Delete(ctx, snapshot); cc.IgnoreNotFound(err) != nil {
×
375
                                return res, err
×
376
                        }
×
377
                        r.log.Info("Snapshot is around even though format switched to PVC, requeueing")
×
378
                        return reconcile.Result{RequeueAfter: time.Second}, nil
×
379
                }
380
                // Below k8s 1.29 there's no way to know the source volume mode
381
                // Let's at least expose this info on our own snapshots
382
                if _, ok := snapshot.Annotations[cc.AnnSourceVolumeMode]; !ok {
2✔
383
                        volMode, err := inferVolumeModeForSnapshot(ctx, r.client, dataImportCron)
1✔
384
                        if err != nil {
1✔
385
                                return res, err
×
386
                        }
×
387
                        if volMode != nil {
2✔
388
                                cc.AddAnnotation(snapshot, cc.AnnSourceVolumeMode, string(*volMode))
1✔
389
                        }
1✔
390
                }
391
                // Copy labels found on dataSource to the existing snapshot in case of upgrades.
392
                dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
393
                if err != nil {
2✔
394
                        if !k8serrors.IsNotFound(err) && !errors.Is(err, ErrNotManagedByCron) {
1✔
395
                                return res, err
×
396
                        }
×
397
                } else {
1✔
398
                        cc.CopyAllowedLabels(dataSource.Labels, snapshot, true)
1✔
399
                }
1✔
400
                if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
1✔
401
                        return res, err
×
402
                }
×
403
                importSucceeded = true
1✔
404
        default:
1✔
405
                if len(imports) > 0 {
2✔
406
                        imports = imports[1:]
1✔
407
                        dataImportCron.Status.CurrentImports = imports
1✔
408
                }
1✔
409
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
410
        }
411

412
        if importSucceeded {
2✔
413
                if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
1✔
414
                        return res, err
×
415
                }
×
416
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
1✔
417
                if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
1✔
418
                        return res, err
×
419
                }
×
420
        }
421

422
        if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
1✔
423
                return res, err
×
424
        }
×
425

426
        // Skip if schedule is disabled
427
        if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
2✔
428
                // We use the poll returned reconcile.Result for RequeueAfter if needed
1✔
429
                pollRes, err := r.pollImageStreamDigest(ctx, dataImportCron)
1✔
430
                if err != nil {
2✔
431
                        return pollRes, err
1✔
432
                }
1✔
433
                res = pollRes
1✔
434
        }
435

436
        desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
437
        digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
1✔
438
        if digestUpdated {
2✔
439
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
1✔
440
                if dv != nil {
1✔
441
                        if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
×
442
                                return res, err
×
443
                        }
×
444
                }
445
                if importSucceeded || len(imports) == 0 {
2✔
446
                        if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
2✔
447
                                return res, err
1✔
448
                        }
1✔
449
                }
450
        } else if importSucceeded {
2✔
451
                if err := r.updateDataImportCronSuccessCondition(dataImportCron, format, snapshot); err != nil {
1✔
452
                        return res, err
×
453
                }
×
454
        } else if len(imports) > 0 {
2✔
455
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
1✔
456
        } else {
2✔
457
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
1✔
458
        }
1✔
459

460
        if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
1✔
461
                return res, err
×
462
        }
×
463

464
        if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
2✔
465
                if err := r.client.Update(ctx, dataImportCron); err != nil {
1✔
466
                        return res, err
×
467
                }
×
468
        }
469
        return res, nil
1✔
470
}
471

472
// Returns the current import DV if exists, and the last imported PVC
473
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
1✔
474
        imports := cron.Status.CurrentImports
1✔
475
        if len(imports) == 0 {
2✔
476
                return nil, nil, nil
1✔
477
        }
1✔
478

479
        dvName := imports[0].DataVolumeName
1✔
480
        dv := &cdiv1.DataVolume{}
1✔
481
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
2✔
482
                if !k8serrors.IsNotFound(err) {
1✔
483
                        return nil, nil, err
×
484
                }
×
485
                dv = nil
1✔
486
        }
487

488
        pvc := &corev1.PersistentVolumeClaim{}
1✔
489
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
2✔
490
                if !k8serrors.IsNotFound(err) {
1✔
491
                        return nil, nil, err
×
492
                }
×
493
                pvc = nil
1✔
494
        }
495
        return dv, pvc, nil
1✔
496
}
497

498
// Returns the current import DV if exists, and the last imported PVC
499
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron) (*snapshotv1.VolumeSnapshot, error) {
1✔
500
        imports := cron.Status.CurrentImports
1✔
501
        if len(imports) == 0 {
2✔
502
                return nil, nil
1✔
503
        }
1✔
504

505
        snapName := imports[0].DataVolumeName
1✔
506
        snapshot := &snapshotv1.VolumeSnapshot{}
1✔
507
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
2✔
508
                if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
509
                        return nil, err
×
510
                }
×
511
                return nil, nil
1✔
512
        }
513

514
        return snapshot, nil
1✔
515
}
516

517
func (r *DataImportCronReconciler) getDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (*cdiv1.DataSource, error) {
1✔
518
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
519
        dataSource := &cdiv1.DataSource{}
1✔
520
        if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
2✔
521
                return nil, err
1✔
522
        }
1✔
523
        if dataSource.Labels[common.DataImportCronLabel] != dataImportCron.Name {
1✔
524
                log := r.log.WithName("getCronManagedDataSource")
×
525
                log.Info("DataSource has no DataImportCron label or is not managed by cron, so it is not updated", "name", dataSourceName, "uid", dataSource.UID, "cron", dataImportCron.Name)
×
526
                return nil, ErrNotManagedByCron
×
527
        }
×
528
        return dataSource, nil
1✔
529
}
530

531
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
1✔
532
        objCopy := obj.DeepCopyObject()
1✔
533
        cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
534
        r.setDataImportCronResourceLabels(cron, obj)
1✔
535
        if !reflect.DeepEqual(obj, objCopy) {
2✔
536
                if err := r.client.Update(ctx, obj); err != nil {
1✔
537
                        return err
×
538
                }
×
539
        }
540
        return nil
1✔
541
}
542

543
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
×
544
        log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
×
545
        if cond := dvc.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
×
546
                if cond.Status == corev1.ConditionFalse && cond.Reason == common.GenericError {
×
547
                        log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
×
548
                        // Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
×
549
                        dv.Labels[common.DataImportCronLabel] = ""
×
550
                        if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
551
                                return err
×
552
                        }
×
553
                        if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
×
554
                                return err
×
555
                        }
×
556
                        cron.Status.CurrentImports = nil
×
557
                }
558
        }
559
        return nil
×
560
}
561

562
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
563
        log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
1✔
564
        regSource, err := getCronRegistrySource(dataImportCron)
1✔
565
        if err != nil {
1✔
566
                return err
×
567
        }
×
568
        if regSource.ImageStream == nil {
1✔
569
                return nil
×
570
        }
×
571
        imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
1✔
572
        if err != nil {
2✔
573
                return err
1✔
574
        }
1✔
575
        digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
1✔
576
        if err != nil {
2✔
577
                return err
1✔
578
        }
1✔
579
        cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
1✔
580
        if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
2✔
581
                log.Info("Updating DataImportCron", "digest", digest)
1✔
582
                cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
1✔
583
                cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
1✔
584
        }
1✔
585
        return nil
1✔
586
}
587

588
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
1✔
589
        log := r.log.WithName("updateDataSource")
1✔
590
        dataSource, err := r.getDataSource(ctx, dataImportCron)
1✔
591
        if err != nil {
2✔
592
                if k8serrors.IsNotFound(err) {
2✔
593
                        dataSource = r.newDataSource(dataImportCron)
1✔
594
                        if err := r.client.Create(ctx, dataSource); err != nil {
1✔
595
                                return err
×
596
                        }
×
597
                        log.Info("DataSource created", "name", dataSource.Name, "uid", dataSource.UID)
1✔
598
                } else if errors.Is(err, ErrNotManagedByCron) {
×
599
                        return nil
×
600
                } else {
×
601
                        return err
×
602
                }
×
603
        }
604
        dataSourceCopy := dataSource.DeepCopy()
1✔
605
        r.setDataImportCronResourceLabels(dataImportCron, dataSource)
1✔
606

1✔
607
        sourcePVC := dataImportCron.Status.LastImportedPVC
1✔
608
        populateDataSource(format, dataSource, sourcePVC)
1✔
609

1✔
610
        if !reflect.DeepEqual(dataSource, dataSourceCopy) {
2✔
611
                if err := r.client.Update(ctx, dataSource); err != nil {
1✔
612
                        return err
×
613
                }
×
614
        }
615

616
        return nil
1✔
617
}
618

619
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
1✔
620
        if sourcePVC == nil {
2✔
621
                return
1✔
622
        }
1✔
623

624
        switch format {
1✔
625
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
626
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
627
                        PVC: sourcePVC,
1✔
628
                }
1✔
629
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
630
                dataSource.Spec.Source = cdiv1.DataSourceSource{
1✔
631
                        Snapshot: &cdiv1.DataVolumeSourceSnapshot{
1✔
632
                                Namespace: sourcePVC.Namespace,
1✔
633
                                Name:      sourcePVC.Name,
1✔
634
                        },
1✔
635
                }
1✔
636
        }
637
}
638

639
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
1✔
640
        if dataImportCron.Status.CurrentImports == nil {
1✔
641
                return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
×
642
        }
×
643
        sourcePVC := &cdiv1.DataVolumeSourcePVC{
1✔
644
                Namespace: dataImportCron.Namespace,
1✔
645
                Name:      dataImportCron.Status.CurrentImports[0].DataVolumeName,
1✔
646
        }
1✔
647
        if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
2✔
648
                dataImportCron.Status.LastImportedPVC = sourcePVC
1✔
649
                now := metav1.Now()
1✔
650
                dataImportCron.Status.LastImportTimestamp = &now
1✔
651
        }
1✔
652
        return nil
1✔
653
}
654

655
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
1✔
656
        lastTimeStr := cron.Annotations[AnnLastCronTime]
1✔
657
        if lastTimeStr == "" {
2✔
658
                return nil
1✔
659
        }
1✔
660
        lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
1✔
661
        if err != nil {
1✔
662
                return err
×
663
        }
×
664
        if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
2✔
665
                cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
1✔
666
        }
1✔
667
        return nil
1✔
668
}
669

670
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
1✔
671
        dataSourceName := dataImportCron.Spec.ManagedDataSource
1✔
672
        digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
673
        if digest == "" {
1✔
674
                return nil
×
675
        }
×
676
        dvName, err := createDvName(dataSourceName, digest)
1✔
677
        if err != nil {
2✔
678
                return err
1✔
679
        }
1✔
680
        dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
1✔
681

1✔
682
        sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
1✔
683
        for _, src := range sources {
2✔
684
                if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
2✔
685
                        if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
1✔
686
                                return err
×
687
                        }
×
688
                } else {
1✔
689
                        if err := r.updateSource(ctx, dataImportCron, src); err != nil {
1✔
690
                                return err
×
691
                        }
×
692
                        // If source exists don't create DV
693
                        return nil
1✔
694
                }
695
        }
696

697
        dv := r.newSourceDataVolume(dataImportCron, dvName)
1✔
698
        if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
1✔
699
                return err
×
700
        }
×
701

702
        return nil
1✔
703
}
704

705
func (r *DataImportCronReconciler) handleStorageClassChange(ctx context.Context, dataImportCron *cdiv1.DataImportCron, desiredStorageClass string) error {
1✔
706
        digest, ok := dataImportCron.Annotations[AnnSourceDesiredDigest]
1✔
707
        if !ok {
1✔
708
                // nothing to delete
×
709
                return nil
×
710
        }
×
711
        name, err := createDvName(dataImportCron.Spec.ManagedDataSource, digest)
1✔
712
        if err != nil {
1✔
713
                return err
×
714
        }
×
715
        om := metav1.ObjectMeta{Name: name, Namespace: dataImportCron.Namespace}
1✔
716
        sources := []client.Object{&snapshotv1.VolumeSnapshot{ObjectMeta: om}, &cdiv1.DataVolume{ObjectMeta: om}, &corev1.PersistentVolumeClaim{ObjectMeta: om}}
1✔
717
        for _, src := range sources {
2✔
718
                if err := r.client.Delete(ctx, src); cc.IgnoreNotFound(err) != nil {
1✔
719
                        return err
×
720
                }
×
721
        }
722
        for _, src := range sources {
2✔
723
                if err := r.client.Get(ctx, client.ObjectKeyFromObject(src), src); err == nil || !k8serrors.IsNotFound(err) {
1✔
724
                        return fmt.Errorf("waiting for old sources to get cleaned up: %w", err)
×
725
                }
×
726
        }
727
        // Only update desired storage class once garbage collection went through
728
        annPatch := fmt.Sprintf(`[{"op":"add","path":"/metadata/annotations/%s","value":"%s" }]`, openapicommon.EscapeJsonPointer(AnnStorageClass), desiredStorageClass)
1✔
729
        err = r.client.Patch(ctx, dataImportCron, client.RawPatch(types.JSONPatchType, []byte(annPatch)))
1✔
730
        if err != nil {
1✔
731
                return err
×
732
        }
×
733

734
        return nil
1✔
735
}
736

737
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, format cdiv1.DataImportCronSourceFormat, desiredStorageClass *storagev1.StorageClass) error {
1✔
738
        switch format {
1✔
739
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
740
                return nil
1✔
741
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
742
                return r.handleSnapshot(ctx, dataImportCron, pvc, desiredStorageClass)
1✔
743
        default:
×
744
                return fmt.Errorf("unknown source format for snapshot")
×
745
        }
746
}
747

748
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim, desiredStorageClass *storagev1.StorageClass) error {
1✔
749
        if sc := pvc.Spec.StorageClassName; sc != nil && *sc != desiredStorageClass.Name {
2✔
750
                r.log.Info("Attempt to change storage class, will not try making a snapshot of the old PVC")
1✔
751
                return nil
1✔
752
        }
1✔
753
        storageProfile := &cdiv1.StorageProfile{}
1✔
754
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
755
                return err
×
756
        }
×
757
        className, err := cc.GetSnapshotClassForSmartClone(pvc, &desiredStorageClass.Name, storageProfile.Status.SnapshotClass, r.log, r.client, r.recorder)
1✔
758
        if err != nil {
1✔
759
                return err
×
760
        }
×
761
        desiredSnapshot := &snapshotv1.VolumeSnapshot{
1✔
762
                ObjectMeta: metav1.ObjectMeta{
1✔
763
                        Name:      pvc.Name,
1✔
764
                        Namespace: dataImportCron.Namespace,
1✔
765
                        Labels: map[string]string{
1✔
766
                                common.CDILabelKey:       common.CDILabelValue,
1✔
767
                                common.CDIComponentLabel: "",
1✔
768
                        },
1✔
769
                },
1✔
770
                Spec: snapshotv1.VolumeSnapshotSpec{
1✔
771
                        Source: snapshotv1.VolumeSnapshotSource{
1✔
772
                                PersistentVolumeClaimName: &pvc.Name,
1✔
773
                        },
1✔
774
                        VolumeSnapshotClassName: &className,
1✔
775
                },
1✔
776
        }
1✔
777
        r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
1✔
778
        cc.CopyAllowedLabels(pvc.GetLabels(), desiredSnapshot, false)
1✔
779

1✔
780
        currentSnapshot := &snapshotv1.VolumeSnapshot{}
1✔
781
        if err := r.client.Get(ctx, client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
2✔
782
                if !k8serrors.IsNotFound(err) {
1✔
783
                        return err
×
784
                }
×
785
                cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
786
                if pvc.Spec.VolumeMode != nil {
2✔
787
                        cc.AddAnnotation(desiredSnapshot, cc.AnnSourceVolumeMode, string(*pvc.Spec.VolumeMode))
1✔
788
                }
1✔
789
                if err := r.client.Create(ctx, desiredSnapshot); err != nil {
1✔
790
                        return err
×
791
                }
×
792
        } else {
1✔
793
                if cc.IsSnapshotReady(currentSnapshot) {
2✔
794
                        // Clean up DV/PVC as they are not needed anymore
1✔
795
                        r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
1✔
796
                        if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
1✔
797
                                return err
×
798
                        }
×
799
                }
800
        }
801

802
        return nil
1✔
803
}
804

805
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
1✔
806
        dataImportCron.Status.SourceFormat = &format
1✔
807

1✔
808
        switch format {
1✔
809
        case cdiv1.DataImportCronSourceFormatPvc:
1✔
810
                updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
811
        case cdiv1.DataImportCronSourceFormatSnapshot:
1✔
812
                if snapshot == nil {
2✔
813
                        // Snapshot create/update will trigger reconcile
1✔
814
                        return nil
1✔
815
                }
1✔
816
                if cc.IsSnapshotReady(snapshot) {
2✔
817
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
1✔
818
                } else {
2✔
819
                        updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
1✔
820
                }
1✔
821
        default:
×
822
                return fmt.Errorf("unknown source format for snapshot")
×
823
        }
824

825
        return nil
1✔
826
}
827

828
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, desiredStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
1✔
829
        format := cdiv1.DataImportCronSourceFormatPvc
1✔
830
        if desiredStorageClass == nil {
2✔
831
                return format, nil
1✔
832
        }
1✔
833

834
        storageProfile := &cdiv1.StorageProfile{}
1✔
835
        if err := r.client.Get(ctx, types.NamespacedName{Name: desiredStorageClass.Name}, storageProfile); err != nil {
1✔
836
                return format, err
×
837
        }
×
838
        if storageProfile.Status.DataImportCronSourceFormat != nil {
2✔
839
                format = *storageProfile.Status.DataImportCronSourceFormat
1✔
840
        }
1✔
841

842
        return format, nil
1✔
843
}
844

845
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
1✔
846
        if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
1✔
847
                return nil
×
848
        }
×
849
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
1✔
850
        if err != nil {
1✔
851
                return err
×
852
        }
×
853

854
        maxImports := defaultImportsToKeepPerCron
1✔
855

1✔
856
        if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
2✔
857
                maxImports = int(*cron.Spec.ImportsToKeep)
1✔
858
        }
1✔
859

860
        if err := r.garbageCollectPVCs(ctx, cron.Namespace, cron.Name, selector, maxImports); err != nil {
1✔
861
                return err
×
862
        }
×
863
        if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
1✔
864
                return err
×
865
        }
×
866

867
        return nil
1✔
868
}
869

870
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace, cronName string, selector labels.Selector, maxImports int) error {
1✔
871
        pvcList := &corev1.PersistentVolumeClaimList{}
1✔
872

1✔
873
        if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
874
                return err
×
875
        }
×
876
        if len(pvcList.Items) > maxImports {
2✔
877
                sort.Slice(pvcList.Items, func(i, j int) bool {
2✔
878
                        return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
1✔
879
                })
1✔
880
                for _, pvc := range pvcList.Items[maxImports:] {
2✔
881
                        r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
882
                        if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
1✔
883
                                return err
×
884
                        }
×
885
                }
886
        }
887

888
        dvList := &cdiv1.DataVolumeList{}
1✔
889
        if err := r.client.List(ctx, dvList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
890
                return err
×
891
        }
×
892

893
        if len(dvList.Items) > maxImports {
2✔
894
                for _, dv := range dvList.Items {
2✔
895
                        pvc := &corev1.PersistentVolumeClaim{}
1✔
896
                        if err := r.client.Get(ctx, types.NamespacedName{Namespace: namespace, Name: dv.Name}, pvc); err != nil {
1✔
897
                                return err
×
898
                        }
×
899

900
                        if pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
901
                                r.log.Info("Deleting old version dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
1✔
902
                                if err := r.deleteDvPvc(ctx, dv.Name, dv.Namespace); err != nil {
1✔
903
                                        return err
×
904
                                }
×
905
                        }
906
                }
907
        }
908

909
        return nil
1✔
910
}
911

912
// deleteDvPvc deletes DV or PVC if DV was GCed
913
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
1✔
914
        om := metav1.ObjectMeta{Name: name, Namespace: namespace}
1✔
915
        dv := &cdiv1.DataVolume{ObjectMeta: om}
1✔
916
        if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
2✔
917
                return err
1✔
918
        }
1✔
919
        pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
1✔
920
        if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
1✔
921
                return err
×
922
        }
×
923
        return nil
1✔
924
}
925

926
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
1✔
927
        snapList := &snapshotv1.VolumeSnapshotList{}
1✔
928

1✔
929
        if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
1✔
930
                if meta.IsNoMatchError(err) {
×
931
                        return nil
×
932
                }
×
933
                return err
×
934
        }
935
        if len(snapList.Items) > maxImports {
1✔
936
                sort.Slice(snapList.Items, func(i, j int) bool {
×
937
                        return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
×
938
                })
×
939
                for _, snap := range snapList.Items[maxImports:] {
×
940
                        r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
×
941
                        if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
×
942
                                return err
×
943
                        }
×
944
                }
945
        }
946

947
        return nil
1✔
948
}
949

950
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
1✔
951
        // Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
1✔
952
        metrics.DeleteDataImportCronOutdated(getPrometheusCronLabels(cron.Namespace, cron.Name))
1✔
953

1✔
954
        if err := r.deleteJobs(ctx, cron); err != nil {
1✔
955
                return err
×
956
        }
×
957
        selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
1✔
958
        if err != nil {
1✔
959
                return err
×
960
        }
×
961
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
1✔
962
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
1✔
963
                return err
×
964
        }
×
965
        if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
1✔
966
                return err
×
967
        }
×
968
        if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
1✔
969
                return err
×
970
        }
×
971
        if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
1✔
972
                return err
×
973
        }
×
974
        return nil
1✔
975
}
976

977
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
1✔
978
        deleteOpts := client.DeleteOptions{PropagationPolicy: ptr.To[metav1.DeletionPropagation](metav1.DeletePropagationBackground)}
1✔
979
        selector, err := getSelector(map[string]string{common.DataImportCronNsLabel: cron.Namespace, common.DataImportCronLabel: cron.Name})
1✔
980
        if err != nil {
1✔
981
                return err
×
982
        }
×
983
        opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}, DeleteOptions: deleteOpts}
1✔
984
        if err := r.client.DeleteAllOf(ctx, &batchv1.CronJob{}, opts); err != nil {
1✔
985
                return err
×
986
        }
×
987
        if err := r.client.DeleteAllOf(ctx, &batchv1.Job{}, opts); err != nil {
1✔
988
                return err
×
989
        }
×
990

991
        return nil
1✔
992
}
993

994
// NewDataImportCronController creates a new instance of the DataImportCron controller
995
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
×
996
        uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
×
997
                Scheme: mgr.GetScheme(),
×
998
                Mapper: mgr.GetRESTMapper(),
×
999
        })
×
1000
        if err != nil {
×
1001
                return nil, err
×
1002
        }
×
1003
        reconciler := &DataImportCronReconciler{
×
1004
                client:          mgr.GetClient(),
×
1005
                uncachedClient:  uncachedClient,
×
1006
                recorder:        mgr.GetEventRecorderFor(dataImportControllerName),
×
1007
                scheme:          mgr.GetScheme(),
×
1008
                log:             log.WithName(dataImportControllerName),
×
1009
                image:           importerImage,
×
1010
                pullPolicy:      pullPolicy,
×
1011
                cdiNamespace:    util.GetNamespace(),
×
1012
                installerLabels: installerLabels,
×
1013
        }
×
1014
        dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{
×
1015
                MaxConcurrentReconciles: 3,
×
1016
                Reconciler:              reconciler,
×
1017
        })
×
1018
        if err != nil {
×
1019
                return nil, err
×
1020
        }
×
1021
        if err := addDataImportCronControllerWatches(mgr, dataImportCronController); err != nil {
×
1022
                return nil, err
×
1023
        }
×
1024
        log.Info("Initialized DataImportCron controller")
×
1025
        return dataImportCronController, nil
×
1026
}
1027

1028
func getCronName(obj client.Object) string {
×
1029
        return obj.GetLabels()[common.DataImportCronLabel]
×
1030
}
×
1031

1032
func getCronNs(obj client.Object) string {
×
1033
        return obj.GetLabels()[common.DataImportCronNsLabel]
×
1034
}
×
1035

1036
func mapSourceObjectToCron[T client.Object](_ context.Context, obj T) []reconcile.Request {
×
1037
        if cronName := getCronName(obj); cronName != "" {
×
1038
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
×
1039
        }
×
1040
        return nil
×
1041
}
1042

1043
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller) error {
×
1044
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataImportCron{}, &handler.TypedEnqueueRequestForObject[*cdiv1.DataImportCron]{})); err != nil {
×
1045
                return err
×
1046
        }
×
1047

1048
        mapStorageProfileToCron := func(ctx context.Context, obj *cdiv1.StorageProfile) []reconcile.Request {
×
1049
                // TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
×
1050
                // Otherwise we risk losing the storage profile event
×
1051
                var crons cdiv1.DataImportCronList
×
1052
                if err := mgr.GetClient().List(ctx, &crons); err != nil {
×
1053
                        c.GetLogger().Error(err, "Unable to list DataImportCrons")
×
1054
                        return nil
×
1055
                }
×
1056
                // Storage profiles are 1:1 to storage classes
1057
                scName := obj.GetName()
×
1058
                var reqs []reconcile.Request
×
1059
                for _, cron := range crons.Items {
×
1060
                        dataVolume := cron.Spec.Template
×
1061
                        explicitScName := cc.GetStorageClassFromDVSpec(&dataVolume)
×
1062
                        templateSc, err := cc.GetStorageClassByNameWithVirtFallback(ctx, mgr.GetClient(), explicitScName, dataVolume.Spec.ContentType)
×
1063
                        if err != nil || templateSc == nil {
×
1064
                                c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
×
1065
                                return reqs
×
1066
                        }
×
1067
                        if templateSc.Name == scName {
×
1068
                                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
×
1069
                        }
×
1070
                }
1071
                return reqs
×
1072
        }
1073

1074
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataVolume{},
×
1075
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataVolume](mapSourceObjectToCron),
×
1076
                predicate.TypedFuncs[*cdiv1.DataVolume]{
×
1077
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataVolume]) bool { return false },
×
1078
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataVolume]) bool { return getCronName(e.ObjectNew) != "" },
×
1079
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataVolume]) bool { return getCronName(e.Object) != "" },
×
1080
                },
1081
        )); err != nil {
×
1082
                return err
×
1083
        }
×
1084

1085
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.DataSource{},
×
1086
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.DataSource](mapSourceObjectToCron),
×
1087
                predicate.TypedFuncs[*cdiv1.DataSource]{
×
1088
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.DataSource]) bool { return false },
×
1089
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.DataSource]) bool { return getCronName(e.ObjectNew) != "" },
×
1090
                        DeleteFunc: func(e event.TypedDeleteEvent[*cdiv1.DataSource]) bool { return getCronName(e.Object) != "" },
×
1091
                },
1092
        )); err != nil {
×
1093
                return err
×
1094
        }
×
1095

1096
        if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{},
×
1097
                handler.TypedEnqueueRequestsFromMapFunc[*corev1.PersistentVolumeClaim](mapSourceObjectToCron),
×
1098
                predicate.TypedFuncs[*corev1.PersistentVolumeClaim]{
×
1099
                        CreateFunc: func(event.TypedCreateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1100
                        UpdateFunc: func(event.TypedUpdateEvent[*corev1.PersistentVolumeClaim]) bool { return false },
×
1101
                        DeleteFunc: func(e event.TypedDeleteEvent[*corev1.PersistentVolumeClaim]) bool { return getCronName(e.Object) != "" },
×
1102
                },
1103
        )); err != nil {
×
1104
                return err
×
1105
        }
×
1106

1107
        if err := addDefaultStorageClassUpdateWatch(mgr, c); err != nil {
×
1108
                return err
×
1109
        }
×
1110

1111
        if err := c.Watch(source.Kind(mgr.GetCache(), &cdiv1.StorageProfile{},
×
1112
                handler.TypedEnqueueRequestsFromMapFunc[*cdiv1.StorageProfile](mapStorageProfileToCron),
×
1113
                predicate.TypedFuncs[*cdiv1.StorageProfile]{
×
1114
                        CreateFunc: func(event.TypedCreateEvent[*cdiv1.StorageProfile]) bool { return true },
×
1115
                        DeleteFunc: func(event.TypedDeleteEvent[*cdiv1.StorageProfile]) bool { return false },
×
1116
                        UpdateFunc: func(e event.TypedUpdateEvent[*cdiv1.StorageProfile]) bool {
×
1117
                                return e.ObjectOld.Status.DataImportCronSourceFormat != e.ObjectNew.Status.DataImportCronSourceFormat
×
1118
                        },
×
1119
                },
1120
        )); err != nil {
×
1121
                return err
×
1122
        }
×
1123

1124
        mapCronJobToCron := func(_ context.Context, obj *batchv1.CronJob) []reconcile.Request {
×
1125
                return []reconcile.Request{{NamespacedName: types.NamespacedName{Namespace: getCronNs(obj), Name: getCronName(obj)}}}
×
1126
        }
×
1127

1128
        if err := c.Watch(source.Kind(mgr.GetCache(), &batchv1.CronJob{},
×
1129
                handler.TypedEnqueueRequestsFromMapFunc[*batchv1.CronJob](mapCronJobToCron),
×
1130
                predicate.TypedFuncs[*batchv1.CronJob]{
×
1131
                        CreateFunc: func(e event.TypedCreateEvent[*batchv1.CronJob]) bool {
×
1132
                                return getCronName(e.Object) != "" && getCronNs(e.Object) != ""
×
1133
                        },
×
1134
                        DeleteFunc: func(event.TypedDeleteEvent[*batchv1.CronJob]) bool { return false },
×
1135
                        UpdateFunc: func(event.TypedUpdateEvent[*batchv1.CronJob]) bool { return false },
×
1136
                },
1137
        )); err != nil {
×
1138
                return err
×
1139
        }
×
1140

1141
        if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotList{}); err != nil {
×
1142
                if meta.IsNoMatchError(err) {
×
1143
                        // Back out if there's no point to attempt watch
×
1144
                        return nil
×
1145
                }
×
1146
                if !cc.IsErrCacheNotStarted(err) {
×
1147
                        return err
×
1148
                }
×
1149
        }
1150
        if err := c.Watch(source.Kind(mgr.GetCache(), &snapshotv1.VolumeSnapshot{},
×
1151
                handler.TypedEnqueueRequestsFromMapFunc[*snapshotv1.VolumeSnapshot](mapSourceObjectToCron),
×
1152
                predicate.TypedFuncs[*snapshotv1.VolumeSnapshot]{
×
1153
                        CreateFunc: func(event.TypedCreateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1154
                        UpdateFunc: func(event.TypedUpdateEvent[*snapshotv1.VolumeSnapshot]) bool { return false },
×
1155
                        DeleteFunc: func(e event.TypedDeleteEvent[*snapshotv1.VolumeSnapshot]) bool { return getCronName(e.Object) != "" },
×
1156
                },
1157
        )); err != nil {
×
1158
                return err
×
1159
        }
×
1160

1161
        return nil
×
1162
}
1163

1164
// addDefaultStorageClassUpdateWatch watches for default/virt default storage class updates
1165
func addDefaultStorageClassUpdateWatch(mgr manager.Manager, c controller.Controller) error {
×
1166
        if err := c.Watch(source.Kind(mgr.GetCache(), &storagev1.StorageClass{},
×
1167
                handler.TypedEnqueueRequestsFromMapFunc[*storagev1.StorageClass](
×
1168
                        func(ctx context.Context, obj *storagev1.StorageClass) []reconcile.Request {
×
1169
                                log := c.GetLogger().WithName("DefaultStorageClassUpdateWatch")
×
1170
                                log.Info("Update", "sc", obj.GetName(),
×
1171
                                        "default", obj.GetAnnotations()[cc.AnnDefaultStorageClass] == "true",
×
1172
                                        "defaultVirt", obj.GetAnnotations()[cc.AnnDefaultVirtStorageClass] == "true")
×
1173
                                reqs, err := getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx, mgr.GetClient())
×
1174
                                if err != nil {
×
1175
                                        log.Error(err, "Failed getting DataImportCrons with pending PVCs")
×
1176
                                }
×
1177
                                return reqs
×
1178
                        },
1179
                ),
1180
                predicate.TypedFuncs[*storagev1.StorageClass]{
1181
                        CreateFunc: func(event.TypedCreateEvent[*storagev1.StorageClass]) bool { return false },
×
1182
                        DeleteFunc: func(event.TypedDeleteEvent[*storagev1.StorageClass]) bool { return false },
×
1183
                        UpdateFunc: func(e event.TypedUpdateEvent[*storagev1.StorageClass]) bool {
×
1184
                                return (e.ObjectNew.Annotations[cc.AnnDefaultStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultStorageClass]) ||
×
1185
                                        (e.ObjectNew.Annotations[cc.AnnDefaultVirtStorageClass] != e.ObjectOld.Annotations[cc.AnnDefaultVirtStorageClass])
×
1186
                        },
×
1187
                },
1188
        )); err != nil {
×
1189
                return err
×
1190
        }
×
1191

1192
        return nil
×
1193
}
1194

1195
func getReconcileRequestsForDicsWithoutExplicitStorageClass(ctx context.Context, c client.Client) ([]reconcile.Request, error) {
×
1196
        dicList := &cdiv1.DataImportCronList{}
×
1197
        if err := c.List(ctx, dicList); err != nil {
×
1198
                return nil, err
×
1199
        }
×
1200
        reqs := []reconcile.Request{}
×
1201
        for _, dic := range dicList.Items {
×
1202
                if cc.GetStorageClassFromDVSpec(&dic.Spec.Template) != nil {
×
1203
                        continue
×
1204
                }
1205

1206
                reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: dic.Name, Namespace: dic.Namespace}})
×
1207
        }
1208

1209
        return reqs, nil
×
1210
}
1211

1212
func (r *DataImportCronReconciler) deleteOutdatedPendingPvc(ctx context.Context, pvc *corev1.PersistentVolumeClaim, desiredStorageClass, cronName string) (bool, error) {
1✔
1213
        if pvc == nil || pvc.Status.Phase != corev1.ClaimPending || pvc.Labels[common.DataImportCronLabel] != cronName {
2✔
1214
                return false, nil
1✔
1215
        }
1✔
1216

1217
        sc := pvc.Spec.StorageClassName
1✔
1218
        if sc == nil || *sc == desiredStorageClass {
2✔
1219
                return false, nil
1✔
1220
        }
1✔
1221

1222
        r.log.Info("Delete pending pvc", "name", pvc.Name, "ns", pvc.Namespace, "sc", *sc)
1✔
1223
        if err := r.client.Delete(ctx, pvc); cc.IgnoreNotFound(err) != nil {
1✔
1224
                return false, err
×
1225
        }
×
1226

1227
        return true, nil
1✔
1228
}
1229

1230
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
1✔
1231
        cronJob := &batchv1.CronJob{}
1✔
1232
        cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
1✔
1233
        if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
2✔
1234
                return false, cc.IgnoreNotFound(err)
1✔
1235
        }
1✔
1236

1237
        cronJobCopy := cronJob.DeepCopy()
1✔
1238
        if err := r.initCronJob(cron, cronJobCopy); err != nil {
1✔
1239
                return false, err
×
1240
        }
×
1241

1242
        if !reflect.DeepEqual(cronJob, cronJobCopy) {
2✔
1243
                r.log.Info("Updating CronJob", "name", cronJob.GetName())
1✔
1244
                if err := r.client.Update(ctx, cronJobCopy); err != nil {
1✔
1245
                        return false, cc.IgnoreNotFound(err)
×
1246
                }
×
1247
        }
1248
        return true, nil
1✔
1249
}
1250

1251
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
1✔
1252
        cronJob := &batchv1.CronJob{
1✔
1253
                ObjectMeta: metav1.ObjectMeta{
1✔
1254
                        Name:      GetCronJobName(cron),
1✔
1255
                        Namespace: r.cdiNamespace,
1✔
1256
                },
1✔
1257
        }
1✔
1258
        if err := r.initCronJob(cron, cronJob); err != nil {
1✔
1259
                return nil, err
×
1260
        }
×
1261
        return cronJob, nil
1✔
1262
}
1263

1264
// InitPollerPodSpec inits poller PodSpec
1265
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
1✔
1266
        regSource, err := getCronRegistrySource(cron)
1✔
1267
        if err != nil {
1✔
1268
                return err
×
1269
        }
×
1270
        if regSource.URL == nil {
1✔
1271
                return errors.Errorf("No URL source in cron %s", cron.Name)
×
1272
        }
×
1273
        cdiConfig := &cdiv1.CDIConfig{}
1✔
1274
        if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
1✔
1275
                return err
×
1276
        }
×
1277
        insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
1✔
1278
        if err != nil {
1✔
1279
                return err
×
1280
        }
×
1281
        container := corev1.Container{
1✔
1282
                Name:  "cdi-source-update-poller",
1✔
1283
                Image: image,
1✔
1284
                Command: []string{
1✔
1285
                        "/usr/bin/cdi-source-update-poller",
1✔
1286
                        "-ns", cron.Namespace,
1✔
1287
                        "-cron", cron.Name,
1✔
1288
                        "-url", *regSource.URL,
1✔
1289
                },
1✔
1290
                ImagePullPolicy:          pullPolicy,
1✔
1291
                TerminationMessagePath:   corev1.TerminationMessagePathDefault,
1✔
1292
                TerminationMessagePolicy: corev1.TerminationMessageReadFile,
1✔
1293
        }
1✔
1294

1✔
1295
        var volumes []corev1.Volume
1✔
1296
        hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
1✔
1297
        if hasCertConfigMap {
1✔
1298
                vm := corev1.VolumeMount{
×
1299
                        Name:      CertVolName,
×
1300
                        MountPath: common.ImporterCertDir,
×
1301
                }
×
1302
                container.VolumeMounts = append(container.VolumeMounts, vm)
×
1303
                container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
×
1304
                volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
×
1305
        }
×
1306

1307
        if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
2✔
1308
                vm := corev1.VolumeMount{
1✔
1309
                        Name:      ProxyCertVolName,
1✔
1310
                        MountPath: common.ImporterProxyCertDir,
1✔
1311
                }
1✔
1312
                container.VolumeMounts = append(container.VolumeMounts, vm)
1✔
1313
                volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
1✔
1314
        }
1✔
1315

1316
        if regSource.SecretRef != nil && *regSource.SecretRef != "" {
1✔
1317
                container.Env = append(container.Env,
×
1318
                        corev1.EnvVar{
×
1319
                                Name: common.ImporterAccessKeyID,
×
1320
                                ValueFrom: &corev1.EnvVarSource{
×
1321
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1322
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1323
                                                        Name: *regSource.SecretRef,
×
1324
                                                },
×
1325
                                                Key: common.KeyAccess,
×
1326
                                        },
×
1327
                                },
×
1328
                        },
×
1329
                        corev1.EnvVar{
×
1330
                                Name: common.ImporterSecretKey,
×
1331
                                ValueFrom: &corev1.EnvVarSource{
×
1332
                                        SecretKeyRef: &corev1.SecretKeySelector{
×
1333
                                                LocalObjectReference: corev1.LocalObjectReference{
×
1334
                                                        Name: *regSource.SecretRef,
×
1335
                                                },
×
1336
                                                Key: common.KeySecret,
×
1337
                                        },
×
1338
                                },
×
1339
                        },
×
1340
                )
×
1341
        }
×
1342

1343
        addEnvVar := func(varName, value string) {
2✔
1344
                container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
1✔
1345
        }
1✔
1346

1347
        if insecureTLS {
1✔
1348
                addEnvVar(common.InsecureTLSVar, "true")
×
1349
        }
×
1350

1351
        addEnvVarFromImportProxyConfig := func(varName string) {
2✔
1352
                if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
2✔
1353
                        addEnvVar(varName, value)
1✔
1354
                }
1✔
1355
        }
1356

1357
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
1✔
1358
        addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
1✔
1359
        addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
1✔
1360

1✔
1361
        imagePullSecrets, err := cc.GetImagePullSecrets(c)
1✔
1362
        if err != nil {
1✔
1363
                return err
×
1364
        }
×
1365
        workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
1✔
1366
        if err != nil {
1✔
1367
                return err
×
1368
        }
×
1369

1370
        podSpec.RestartPolicy = corev1.RestartPolicyNever
1✔
1371
        podSpec.TerminationGracePeriodSeconds = ptr.To[int64](0)
1✔
1372
        podSpec.Containers = []corev1.Container{container}
1✔
1373
        podSpec.ServiceAccountName = common.CronJobServiceAccountName
1✔
1374
        podSpec.Volumes = volumes
1✔
1375
        podSpec.ImagePullSecrets = imagePullSecrets
1✔
1376
        podSpec.NodeSelector = workloadNodePlacement.NodeSelector
1✔
1377
        podSpec.Tolerations = workloadNodePlacement.Tolerations
1✔
1378
        podSpec.Affinity = workloadNodePlacement.Affinity
1✔
1379

1✔
1380
        cc.SetRestrictedSecurityContext(podSpec)
1✔
1381

1✔
1382
        return nil
1✔
1383
}
1384

1385
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
1✔
1386
        cronJobSpec := &cronJob.Spec
1✔
1387
        cronJobSpec.Schedule = cron.Spec.Schedule
1✔
1388
        cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
1✔
1389
        cronJobSpec.SuccessfulJobsHistoryLimit = ptr.To[int32](1)
1✔
1390
        cronJobSpec.FailedJobsHistoryLimit = ptr.To[int32](1)
1✔
1391

1✔
1392
        jobSpec := &cronJobSpec.JobTemplate.Spec
1✔
1393
        jobSpec.BackoffLimit = ptr.To[int32](2)
1✔
1394
        jobSpec.TTLSecondsAfterFinished = ptr.To[int32](10)
1✔
1395

1✔
1396
        podSpec := &jobSpec.Template.Spec
1✔
1397
        if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
1✔
1398
                return err
×
1399
        }
×
1400
        if err := r.setJobCommon(cron, cronJob); err != nil {
1✔
1401
                return err
×
1402
        }
×
1403
        return nil
1✔
1404
}
1405

1406
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
1✔
1407
        job := &batchv1.Job{
1✔
1408
                ObjectMeta: metav1.ObjectMeta{
1✔
1409
                        Name:      GetInitialJobName(cron),
1✔
1410
                        Namespace: cronJob.Namespace,
1✔
1411
                },
1✔
1412
                Spec: cronJob.Spec.JobTemplate.Spec,
1✔
1413
        }
1✔
1414
        if err := r.setJobCommon(cron, job); err != nil {
1✔
1415
                return nil, err
×
1416
        }
×
1417
        return job, nil
1✔
1418
}
1419

1420
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
1✔
1421
        if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
1✔
1422
                return err
×
1423
        }
×
1424
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1425
        labels := obj.GetLabels()
1✔
1426
        labels[common.DataImportCronNsLabel] = cron.Namespace
1✔
1427
        labels[common.DataImportCronLabel] = cron.Name
1✔
1428
        obj.SetLabels(labels)
1✔
1429
        return nil
1✔
1430
}
1431

1432
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
1✔
1433
        var digestedURL string
1✔
1434
        dv := cron.Spec.Template.DeepCopy()
1✔
1435
        if isURLSource(cron) {
2✔
1436
                digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
1✔
1437
        } else if isImageStreamSource(cron) {
3✔
1438
                // No way to import image stream by name when we want specific digest, so we use its docker reference
1✔
1439
                digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
1✔
1440
                dv.Spec.Source.Registry.ImageStream = nil
1✔
1441
        }
1✔
1442
        dv.Spec.Source.Registry.URL = &digestedURL
1✔
1443
        dv.Name = dataVolumeName
1✔
1444
        dv.Namespace = cron.Namespace
1✔
1445
        r.setDataImportCronResourceLabels(cron, dv)
1✔
1446
        cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
1✔
1447
        cc.AddAnnotation(dv, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
1✔
1448
        passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
1✔
1449

1✔
1450
        for _, defaultInstanceTypeLabel := range cc.DefaultInstanceTypeLabels {
2✔
1451
                passCronLabelToDv(cron, dv, defaultInstanceTypeLabel)
1✔
1452
        }
1✔
1453

1454
        passCronLabelToDv(cron, dv, cc.LabelDynamicCredentialSupport)
1✔
1455

1✔
1456
        return dv
1✔
1457
}
1458

1459
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
1✔
1460
        util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
1✔
1461
        labels := obj.GetLabels()
1✔
1462
        labels[common.DataImportCronLabel] = cron.Name
1✔
1463
        if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
2✔
1464
                labels[common.DataImportCronCleanupLabel] = "true"
1✔
1465
        }
1✔
1466
        obj.SetLabels(labels)
1✔
1467
}
1468

1469
func untagDigestedDockerURL(dockerURL string) string {
1✔
1470
        if u, err := url.Parse(dockerURL); err == nil {
2✔
1471
                url := u.Host + u.Path
1✔
1472
                subs := reference.ReferenceRegexp.FindStringSubmatch(url)
1✔
1473
                // Check for tag
1✔
1474
                if len(subs) > 2 && len(subs[2]) > 0 {
2✔
1475
                        if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
2✔
1476
                                return u.Scheme + "://" + untaggedRef.String()
1✔
1477
                        }
1✔
1478
                }
1479
        }
1480
        return dockerURL
1✔
1481
}
1482

1483
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1484
        if val := cron.Labels[ann]; val != "" {
2✔
1485
                cc.AddLabel(dv, ann, val)
1✔
1486
        }
1✔
1487
}
1488

1489
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
1✔
1490
        if val := cron.Annotations[ann]; val != "" {
1✔
1491
                cc.AddAnnotation(dv, ann, val)
×
1492
        }
×
1493
}
1494

1495
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
1✔
1496
        dataSource := &cdiv1.DataSource{
1✔
1497
                ObjectMeta: metav1.ObjectMeta{
1✔
1498
                        Name:      cron.Spec.ManagedDataSource,
1✔
1499
                        Namespace: cron.Namespace,
1✔
1500
                },
1✔
1501
        }
1✔
1502
        util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
1✔
1503
        dataSource.Labels[common.DataImportCronLabel] = cron.Name
1✔
1504
        return dataSource
1✔
1505
}
1✔
1506

1507
// Create DataVolume name based on the DataSource name + prefix of the digest sha256
1508
func createDvName(prefix, digest string) (string, error) {
1✔
1509
        fromIdx := len(digestPrefix)
1✔
1510
        toIdx := fromIdx + digestDvNameSuffixLength
1✔
1511
        if !strings.HasPrefix(digest, digestPrefix) {
2✔
1512
                return "", errors.Errorf("Digest has no supported prefix")
1✔
1513
        }
1✔
1514
        if len(digest) < toIdx {
2✔
1515
                return "", errors.Errorf("Digest is too short")
1✔
1516
        }
1✔
1517
        return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
1✔
1518
}
1519

1520
// GetCronJobName get CronJob name based on cron name and UID
1521
func GetCronJobName(cron *cdiv1.DataImportCron) string {
1✔
1522
        return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
1✔
1523
}
1✔
1524

1525
// GetInitialJobName get initial job name based on cron name and UID
1526
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
1✔
1527
        return naming.GetResourceName("initial-job", GetCronJobName(cron))
1✔
1528
}
1✔
1529

1530
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
1✔
1531
        return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
1✔
1532
}
1✔
1533

1534
func inferVolumeModeForSnapshot(ctx context.Context, client client.Client, cron *cdiv1.DataImportCron) (*corev1.PersistentVolumeMode, error) {
1✔
1535
        dv := &cron.Spec.Template
1✔
1536

1✔
1537
        if explicitVolumeMode := getVolumeModeFromDVSpec(dv); explicitVolumeMode != nil {
1✔
1538
                return explicitVolumeMode, nil
×
1539
        }
×
1540

1541
        accessModes := getAccessModesFromDVSpec(dv)
1✔
1542
        inferredPvc := &corev1.PersistentVolumeClaim{
1✔
1543
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1544
                        StorageClassName: cc.GetStorageClassFromDVSpec(dv),
1✔
1545
                        AccessModes:      accessModes,
1✔
1546
                        VolumeMode:       ptr.To(cdiv1.PersistentVolumeFromStorageProfile),
1✔
1547
                        Resources: corev1.VolumeResourceRequirements{
1✔
1548
                                Requests: corev1.ResourceList{
1✔
1549
                                        // Doesn't matter
1✔
1550
                                        corev1.ResourceStorage: resource.MustParse("1Gi"),
1✔
1551
                                },
1✔
1552
                        },
1✔
1553
                },
1✔
1554
        }
1✔
1555
        if err := dvc.RenderPvc(ctx, client, inferredPvc); err != nil {
1✔
1556
                return nil, err
×
1557
        }
×
1558

1559
        return inferredPvc.Spec.VolumeMode, nil
1✔
1560
}
1561

1562
// getVolumeModeFromDVSpec returns the volume mode from DataVolume PVC or Storage spec
1563
func getVolumeModeFromDVSpec(dv *cdiv1.DataVolume) *corev1.PersistentVolumeMode {
1✔
1564
        if dv.Spec.PVC != nil {
1✔
1565
                return dv.Spec.PVC.VolumeMode
×
1566
        }
×
1567

1568
        if dv.Spec.Storage != nil {
2✔
1569
                return dv.Spec.Storage.VolumeMode
1✔
1570
        }
1✔
1571

1572
        return nil
×
1573
}
1574

1575
// getAccessModesFromDVSpec returns the access modes from DataVolume PVC or Storage spec
1576
func getAccessModesFromDVSpec(dv *cdiv1.DataVolume) []corev1.PersistentVolumeAccessMode {
1✔
1577
        if dv.Spec.PVC != nil {
1✔
1578
                return dv.Spec.PVC.AccessModes
×
1579
        }
×
1580

1581
        if dv.Spec.Storage != nil {
2✔
1582
                return dv.Spec.Storage.AccessModes
1✔
1583
        }
1✔
1584

1585
        return nil
×
1586
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc