• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5491

17 Jul 2025 01:25AM UTC coverage: 59.326% (-0.2%) from 59.502%
#5491

push

travis-ci

web-flow
Populate DV with events from PVC Prime (#3764)

* update role for controller so it can get list of events

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add new field index for events so we can filter by the object's name. add new function that gets all events associated with a primePvc and re-emits them for the regular pvc

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add watcher for Events filtered by PVC type. modify copyEvent func to only emit unique events from primePVC

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add new field index for event uids so we can filter accordingly

Signed-off-by: dsanatar <dsanatar@redhat.com>

* sort events by most recent timestamps and so we can loop more efficiently to emit new events

Signed-off-by: dsanatar <dsanatar@redhat.com>

* fix linting

Signed-off-by: dsanatar <dsanatar@redhat.com>

* modify watcher to filter on only prime pvc events. move copyEvents to ReconcileTargetPvc func. modify copyEvents logic to handle edge case where events have same timestamps

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add missing bracket

Signed-off-by: dsanatar <dsanatar@redhat.com>

* modify CopyEvents func to take in two client.Objects instead so we can resuse the same func when we need to copy events from primePvc to pvc and from pvc to dv

Signed-off-by: dsanatar <dsanatar@redhat.com>

* move func call to CopyEvents to emitEvents func so it only occurs when DVs status has changed

Signed-off-by: dsanatar <dsanatar@redhat.com>

* add conditional to CopyEvents so when we are handling DVs we only copy over events from the primePVC

Signed-off-by: dsanatar <dsanatar@redhat.com>

* remove debug logs

Signed-off-by: dsanatar <dsanatar@redhat.com>

* reuse existing function to add pvcPrime name annotation to import populator so that we can access the prime name downstream

Signed-off-by: dsanatar <dsanatar@redhat.com>

* update DV bound condition to include PVC Prime name if one exists while the claim is stil... (continued)

75 of 206 new or added lines in 11 files covered. (36.41%)

5 existing lines in 2 files now uncovered.

17163 of 28930 relevant lines covered (59.33%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.26
/pkg/controller/common/util.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package common
18

19
import (
20
        "context"
21
        "crypto/rand"
22
        "crypto/rsa"
23
        "crypto/tls"
24
        "fmt"
25
        "io"
26
        "net"
27
        "net/http"
28
        "reflect"
29
        "regexp"
30
        "sort"
31
        "strconv"
32
        "strings"
33
        "sync"
34
        "time"
35

36
        "github.com/go-logr/logr"
37
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
38
        ocpconfigv1 "github.com/openshift/api/config/v1"
39
        "github.com/pkg/errors"
40

41
        corev1 "k8s.io/api/core/v1"
42
        storagev1 "k8s.io/api/storage/v1"
43
        extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
44
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
45
        "k8s.io/apimachinery/pkg/api/meta"
46
        "k8s.io/apimachinery/pkg/api/resource"
47
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
48
        "k8s.io/apimachinery/pkg/labels"
49
        "k8s.io/apimachinery/pkg/runtime"
50
        "k8s.io/apimachinery/pkg/types"
51
        "k8s.io/apimachinery/pkg/util/sets"
52
        "k8s.io/client-go/tools/cache"
53
        "k8s.io/client-go/tools/record"
54
        "k8s.io/klog/v2"
55
        "k8s.io/utils/ptr"
56

57
        runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
58
        "sigs.k8s.io/controller-runtime/pkg/client"
59
        "sigs.k8s.io/controller-runtime/pkg/client/fake"
60

61
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
62
        cdiv1utils "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1/utils"
63
        "kubevirt.io/containerized-data-importer/pkg/client/clientset/versioned/scheme"
64
        "kubevirt.io/containerized-data-importer/pkg/common"
65
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
66
        "kubevirt.io/containerized-data-importer/pkg/token"
67
        "kubevirt.io/containerized-data-importer/pkg/util"
68
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
69
)
70

71
const (
72
        // DataVolName provides a const to use for creating volumes in pod specs
73
        DataVolName = "cdi-data-vol"
74

75
        // ScratchVolName provides a const to use for creating scratch pvc volumes in pod specs
76
        ScratchVolName = "cdi-scratch-vol"
77

78
        // AnnAPIGroup is the APIGroup for CDI
79
        AnnAPIGroup = "cdi.kubevirt.io"
80
        // AnnCreatedBy is a pod annotation indicating if the pod was created by the PVC
81
        AnnCreatedBy = AnnAPIGroup + "/storage.createdByController"
82
        // AnnPodPhase is a PVC annotation indicating the related pod progress (phase)
83
        AnnPodPhase = AnnAPIGroup + "/storage.pod.phase"
84
        // AnnPodReady tells whether the pod is ready
85
        AnnPodReady = AnnAPIGroup + "/storage.pod.ready"
86
        // AnnPodRestarts is a PVC annotation that tells how many times a related pod was restarted
87
        AnnPodRestarts = AnnAPIGroup + "/storage.pod.restarts"
88
        // AnnPodSchedulable is a PVC annotation that tells if the Pod is schedulable or not
89
        AnnPodSchedulable = AnnAPIGroup + "/storage.pod.schedulable"
90
        // AnnPopulatedFor is a PVC annotation telling the datavolume controller that the PVC is already populated
91
        AnnPopulatedFor = AnnAPIGroup + "/storage.populatedFor"
92
        // AnnPrePopulated is a PVC annotation telling the datavolume controller that the PVC is already populated
93
        AnnPrePopulated = AnnAPIGroup + "/storage.prePopulated"
94
        // AnnPriorityClassName is PVC annotation to indicate the priority class name for importer, cloner and uploader pod
95
        AnnPriorityClassName = AnnAPIGroup + "/storage.pod.priorityclassname"
96
        // AnnExternalPopulation annotation marks a PVC as "externally populated", allowing the import-controller to skip it
97
        AnnExternalPopulation = AnnAPIGroup + "/externalPopulation"
98

99
        // AnnPodRetainAfterCompletion is PVC annotation for retaining transfer pods after completion
100
        AnnPodRetainAfterCompletion = AnnAPIGroup + "/storage.pod.retainAfterCompletion"
101

102
        // AnnPreviousCheckpoint provides a const to indicate the previous snapshot for a multistage import
103
        AnnPreviousCheckpoint = AnnAPIGroup + "/storage.checkpoint.previous"
104
        // AnnCurrentCheckpoint provides a const to indicate the current snapshot for a multistage import
105
        AnnCurrentCheckpoint = AnnAPIGroup + "/storage.checkpoint.current"
106
        // AnnFinalCheckpoint provides a const to indicate whether the current checkpoint is the last one
107
        AnnFinalCheckpoint = AnnAPIGroup + "/storage.checkpoint.final"
108
        // AnnCheckpointsCopied is a prefix for recording which checkpoints have already been copied
109
        AnnCheckpointsCopied = AnnAPIGroup + "/storage.checkpoint.copied"
110

111
        // AnnCurrentPodID keeps track of the latest pod servicing this PVC
112
        AnnCurrentPodID = AnnAPIGroup + "/storage.checkpoint.pod.id"
113
        // AnnMultiStageImportDone marks a multi-stage import as totally finished
114
        AnnMultiStageImportDone = AnnAPIGroup + "/storage.checkpoint.done"
115

116
        // AnnPopulatorProgress is a standard annotation that can be used progress reporting
117
        AnnPopulatorProgress = AnnAPIGroup + "/storage.populator.progress"
118

119
        // AnnPreallocationRequested provides a const to indicate whether preallocation should be performed on the PV
120
        AnnPreallocationRequested = AnnAPIGroup + "/storage.preallocation.requested"
121
        // AnnPreallocationApplied provides a const for PVC preallocation annotation
122
        AnnPreallocationApplied = AnnAPIGroup + "/storage.preallocation"
123

124
        // AnnRunningCondition provides a const for the running condition
125
        AnnRunningCondition = AnnAPIGroup + "/storage.condition.running"
126
        // AnnRunningConditionMessage provides a const for the running condition
127
        AnnRunningConditionMessage = AnnAPIGroup + "/storage.condition.running.message"
128
        // AnnRunningConditionReason provides a const for the running condition
129
        AnnRunningConditionReason = AnnAPIGroup + "/storage.condition.running.reason"
130

131
        // AnnBoundCondition provides a const for the running condition
132
        AnnBoundCondition = AnnAPIGroup + "/storage.condition.bound"
133
        // AnnBoundConditionMessage provides a const for the running condition
134
        AnnBoundConditionMessage = AnnAPIGroup + "/storage.condition.bound.message"
135
        // AnnBoundConditionReason provides a const for the running condition
136
        AnnBoundConditionReason = AnnAPIGroup + "/storage.condition.bound.reason"
137

138
        // AnnSourceRunningCondition provides a const for the running condition
139
        AnnSourceRunningCondition = AnnAPIGroup + "/storage.condition.source.running"
140
        // AnnSourceRunningConditionMessage provides a const for the running condition
141
        AnnSourceRunningConditionMessage = AnnAPIGroup + "/storage.condition.source.running.message"
142
        // AnnSourceRunningConditionReason provides a const for the running condition
143
        AnnSourceRunningConditionReason = AnnAPIGroup + "/storage.condition.source.running.reason"
144

145
        // AnnVddkVersion shows the last VDDK library version used by a DV's importer pod
146
        AnnVddkVersion = AnnAPIGroup + "/storage.pod.vddk.version"
147
        // AnnVddkHostConnection shows the last ESX host that serviced a DV's importer pod
148
        AnnVddkHostConnection = AnnAPIGroup + "/storage.pod.vddk.host"
149
        // AnnVddkInitImageURL saves a per-DV VDDK image URL on the PVC
150
        AnnVddkInitImageURL = AnnAPIGroup + "/storage.pod.vddk.initimageurl"
151
        // AnnVddkExtraArgs references a ConfigMap that holds arguments to pass directly to the VDDK library
152
        AnnVddkExtraArgs = AnnAPIGroup + "/storage.pod.vddk.extraargs"
153

154
        // AnnRequiresScratch provides a const for our PVC requiring scratch annotation
155
        AnnRequiresScratch = AnnAPIGroup + "/storage.import.requiresScratch"
156

157
        // AnnRequiresDirectIO provides a const for our PVC requiring direct io annotation (due to OOMs we need to try qemu cache=none)
158
        AnnRequiresDirectIO = AnnAPIGroup + "/storage.import.requiresDirectIo"
159
        // OOMKilledReason provides a value that container runtimes must return in the reason field for an OOMKilled container
160
        OOMKilledReason = "OOMKilled"
161

162
        // AnnContentType provides a const for the PVC content-type
163
        AnnContentType = AnnAPIGroup + "/storage.contentType"
164

165
        // AnnSource provide a const for our PVC import source annotation
166
        AnnSource = AnnAPIGroup + "/storage.import.source"
167
        // AnnEndpoint provides a const for our PVC endpoint annotation
168
        AnnEndpoint = AnnAPIGroup + "/storage.import.endpoint"
169

170
        // AnnSecret provides a const for our PVC secretName annotation
171
        AnnSecret = AnnAPIGroup + "/storage.import.secretName"
172
        // AnnCertConfigMap is the name of a configmap containing tls certs
173
        AnnCertConfigMap = AnnAPIGroup + "/storage.import.certConfigMap"
174
        // AnnRegistryImportMethod provides a const for registry import method annotation
175
        AnnRegistryImportMethod = AnnAPIGroup + "/storage.import.registryImportMethod"
176
        // AnnRegistryImageStream provides a const for registry image stream annotation
177
        AnnRegistryImageStream = AnnAPIGroup + "/storage.import.registryImageStream"
178
        // AnnImportPod provides a const for our PVC importPodName annotation
179
        AnnImportPod = AnnAPIGroup + "/storage.import.importPodName"
180
        // AnnDiskID provides a const for our PVC diskId annotation
181
        AnnDiskID = AnnAPIGroup + "/storage.import.diskId"
182
        // AnnUUID provides a const for our PVC uuid annotation
183
        AnnUUID = AnnAPIGroup + "/storage.import.uuid"
184
        // AnnBackingFile provides a const for our PVC backing file annotation
185
        AnnBackingFile = AnnAPIGroup + "/storage.import.backingFile"
186
        // AnnThumbprint provides a const for our PVC backing thumbprint annotation
187
        AnnThumbprint = AnnAPIGroup + "/storage.import.vddk.thumbprint"
188
        // AnnExtraHeaders provides a const for our PVC extraHeaders annotation
189
        AnnExtraHeaders = AnnAPIGroup + "/storage.import.extraHeaders"
190
        // AnnSecretExtraHeaders provides a const for our PVC secretExtraHeaders annotation
191
        AnnSecretExtraHeaders = AnnAPIGroup + "/storage.import.secretExtraHeaders"
192
        // AnnRegistryImageArchitecture provides a const for our PVC registryImageArchitecture annotation
193
        AnnRegistryImageArchitecture = AnnAPIGroup + "/storage.import.registryImageArchitecture"
194

195
        // AnnCloneToken is the annotation containing the clone token
196
        AnnCloneToken = AnnAPIGroup + "/storage.clone.token"
197
        // AnnExtendedCloneToken is the annotation containing the long term clone token
198
        AnnExtendedCloneToken = AnnAPIGroup + "/storage.extended.clone.token"
199
        // AnnPermissiveClone annotation allows the clone-controller to skip the clone size validation
200
        AnnPermissiveClone = AnnAPIGroup + "/permissiveClone"
201
        // AnnOwnerUID annotation has the owner UID
202
        AnnOwnerUID = AnnAPIGroup + "/ownerUID"
203
        // AnnCloneType is the comuuted/requested clone type
204
        AnnCloneType = AnnAPIGroup + "/cloneType"
205
        // AnnCloneSourcePod name of the source clone pod
206
        AnnCloneSourcePod = AnnAPIGroup + "/storage.sourceClonePodName"
207

208
        // AnnUploadRequest marks that a PVC should be made available for upload
209
        AnnUploadRequest = AnnAPIGroup + "/storage.upload.target"
210

211
        // AnnCheckStaticVolume checks if a statically allocated PV exists before creating the target PVC.
212
        // If so, PVC is still created but population is skipped
213
        AnnCheckStaticVolume = AnnAPIGroup + "/storage.checkStaticVolume"
214

215
        // AnnPersistentVolumeList is an annotation storing a list of PV names
216
        AnnPersistentVolumeList = AnnAPIGroup + "/storage.persistentVolumeList"
217

218
        // AnnPopulatorKind annotation is added to a PVC' to specify the population kind, so it's later
219
        // checked by the common populator watches.
220
        AnnPopulatorKind = AnnAPIGroup + "/storage.populator.kind"
221
        // AnnUsePopulator annotation indicates if the datavolume population will use populators
222
        AnnUsePopulator = AnnAPIGroup + "/storage.usePopulator"
223

224
        // AnnMinimumSupportedPVCSize annotation on a StorageProfile specifies its minimum supported PVC size
225
        AnnMinimumSupportedPVCSize = AnnAPIGroup + "/minimumSupportedPvcSize"
226

227
        // AnnDefaultStorageClass is the annotation indicating that a storage class is the default one
228
        AnnDefaultStorageClass = "storageclass.kubernetes.io/is-default-class"
229
        // AnnDefaultVirtStorageClass is the annotation indicating that a storage class is the default one for virtualization purposes
230
        AnnDefaultVirtStorageClass = "storageclass.kubevirt.io/is-default-virt-class"
231
        // AnnDefaultSnapshotClass is the annotation indicating that a snapshot class is the default one
232
        AnnDefaultSnapshotClass = "snapshot.storage.kubernetes.io/is-default-class"
233

234
        // AnnSourceVolumeMode is the volume mode of the source PVC specified as an annotation on snapshots
235
        AnnSourceVolumeMode = AnnAPIGroup + "/storage.import.sourceVolumeMode"
236

237
        // AnnOpenShiftImageLookup is the annotation for OpenShift image stream lookup
238
        AnnOpenShiftImageLookup = "alpha.image.policy.openshift.io/resolve-names"
239

240
        // AnnCloneRequest sets our expected annotation for a CloneRequest
241
        AnnCloneRequest = "k8s.io/CloneRequest"
242
        // AnnCloneOf is used to indicate that cloning was complete
243
        AnnCloneOf = "k8s.io/CloneOf"
244

245
        // AnnPodNetwork is used for specifying Pod Network
246
        AnnPodNetwork = "k8s.v1.cni.cncf.io/networks"
247
        // AnnPodMultusDefaultNetwork is used for specifying default Pod Network
248
        AnnPodMultusDefaultNetwork = "v1.multus-cni.io/default-network"
249
        // AnnPodSidecarInjectionIstio is used for enabling/disabling Pod istio/AspenMesh sidecar injection
250
        AnnPodSidecarInjectionIstio = "sidecar.istio.io/inject"
251
        // AnnPodSidecarInjectionIstioDefault is the default value passed for AnnPodSidecarInjection
252
        AnnPodSidecarInjectionIstioDefault = "false"
253
        // AnnPodSidecarInjectionLinkerd is used to enable/disable linkerd sidecar injection
254
        AnnPodSidecarInjectionLinkerd = "linkerd.io/inject"
255
        // AnnPodSidecarInjectionLinkerdDefault is the default value passed for AnnPodSidecarInjectionLinkerd
256
        AnnPodSidecarInjectionLinkerdDefault = "disabled"
257

258
        // AnnImmediateBinding provides a const to indicate whether immediate binding should be performed on the PV (overrides global config)
259
        AnnImmediateBinding = AnnAPIGroup + "/storage.bind.immediate.requested"
260

261
        // AnnSelectedNode annotation is added to a PVC that has been triggered by scheduler to
262
        // be dynamically provisioned. Its value is the name of the selected node.
263
        AnnSelectedNode = "volume.kubernetes.io/selected-node"
264

265
        // CloneUniqueID is used as a special label to be used when we search for the pod
266
        CloneUniqueID = AnnAPIGroup + "/storage.clone.cloneUniqeId"
267

268
        // CloneSourceInUse is reason for event created when clone source pvc is in use
269
        CloneSourceInUse = "CloneSourceInUse"
270

271
        // CloneComplete message
272
        CloneComplete = "Clone Complete"
273

274
        cloneTokenLeeway = 10 * time.Second
275

276
        // Default value for preallocation option if not defined in DV or CDIConfig
277
        defaultPreallocation = false
278

279
        // ErrStartingPod provides a const to indicate that a pod wasn't able to start without providing sensitive information (reason)
280
        ErrStartingPod = "ErrStartingPod"
281
        // MessageErrStartingPod provides a const to indicate that a pod wasn't able to start without providing sensitive information (message)
282
        MessageErrStartingPod = "Error starting pod '%s': For more information, request access to cdi-deploy logs from your sysadmin"
283
        // ErrClaimNotValid provides a const to indicate a claim is not valid
284
        ErrClaimNotValid = "ErrClaimNotValid"
285
        // ErrExceededQuota provides a const to indicate the claim has exceeded the quota
286
        ErrExceededQuota = "ErrExceededQuota"
287
        // ErrIncompatiblePVC provides a const to indicate a clone is not possible due to an incompatible PVC
288
        ErrIncompatiblePVC = "ErrIncompatiblePVC"
289

290
        // SourceHTTP is the source type HTTP, if unspecified or invalid, it defaults to SourceHTTP
291
        SourceHTTP = "http"
292
        // SourceS3 is the source type S3
293
        SourceS3 = "s3"
294
        // SourceGCS is the source type GCS
295
        SourceGCS = "gcs"
296
        // SourceGlance is the source type of glance
297
        SourceGlance = "glance"
298
        // SourceNone means there is no source.
299
        SourceNone = "none"
300
        // SourceRegistry is the source type of Registry
301
        SourceRegistry = "registry"
302
        // SourceImageio is the source type ovirt-imageio
303
        SourceImageio = "imageio"
304
        // SourceVDDK is the source type of VDDK
305
        SourceVDDK = "vddk"
306

307
        // VolumeSnapshotClassSelected reports that a VolumeSnapshotClass was selected
308
        VolumeSnapshotClassSelected = "VolumeSnapshotClassSelected"
309
        // MessageStorageProfileVolumeSnapshotClassSelected reports that a VolumeSnapshotClass was selected according to StorageProfile
310
        MessageStorageProfileVolumeSnapshotClassSelected = "VolumeSnapshotClass selected according to StorageProfile"
311
        // MessageDefaultVolumeSnapshotClassSelected reports that the default VolumeSnapshotClass was selected
312
        MessageDefaultVolumeSnapshotClassSelected = "Default VolumeSnapshotClass selected"
313
        // MessageFirstVolumeSnapshotClassSelected reports that the first VolumeSnapshotClass was selected
314
        MessageFirstVolumeSnapshotClassSelected = "First VolumeSnapshotClass selected"
315

316
        // ClaimLost reason const
317
        ClaimLost = "ClaimLost"
318
        // NotFound reason const
319
        NotFound = "NotFound"
320

321
        // LabelDefaultInstancetype provides a default VirtualMachine{ClusterInstancetype,Instancetype} that can be used by a VirtualMachine booting from a given PVC
322
        LabelDefaultInstancetype = "instancetype.kubevirt.io/default-instancetype"
323
        // LabelDefaultInstancetypeKind provides a default kind of either VirtualMachineClusterInstancetype or VirtualMachineInstancetype
324
        LabelDefaultInstancetypeKind = "instancetype.kubevirt.io/default-instancetype-kind"
325
        // LabelDefaultPreference provides a default VirtualMachine{ClusterPreference,Preference} that can be used by a VirtualMachine booting from a given PVC
326
        LabelDefaultPreference = "instancetype.kubevirt.io/default-preference"
327
        // LabelDefaultPreferenceKind provides a default kind of either VirtualMachineClusterPreference or VirtualMachinePreference
328
        LabelDefaultPreferenceKind = "instancetype.kubevirt.io/default-preference-kind"
329

330
        // LabelDynamicCredentialSupport specifies if the OS supports updating credentials at runtime.
331
        //nolint:gosec // These are not credentials
332
        LabelDynamicCredentialSupport = "kubevirt.io/dynamic-credentials-support"
333

334
        // LabelExcludeFromVeleroBackup provides a const to indicate whether an object should be excluded from velero backup
335
        LabelExcludeFromVeleroBackup = "velero.io/exclude-from-backup"
336

337
        // ProgressDone this means we are DONE
338
        ProgressDone = "100.0%"
339

340
        // AnnEventSourceKind is the source kind that should be related to events
341
        AnnEventSourceKind = AnnAPIGroup + "/events.source.kind"
342
        // AnnEventSource is the source that should be related to events (namespace/name)
343
        AnnEventSource = AnnAPIGroup + "/events.source"
344

345
        // AnnAllowClaimAdoption is the annotation that allows a claim to be adopted by a DataVolume
346
        AnnAllowClaimAdoption = AnnAPIGroup + "/allowClaimAdoption"
347

348
        // AnnCdiCustomizeComponentHash annotation is a hash of all customizations that live under spec.CustomizeComponents
349
        AnnCdiCustomizeComponentHash = AnnAPIGroup + "/customizer-identifier"
350

351
        // AnnCreatedForDataVolume stores the UID of the datavolume that the PVC was created for
352
        AnnCreatedForDataVolume = AnnAPIGroup + "/createdForDataVolume"
353

354
        // AnnPVCPrimeName annotation is the name of the PVC' that is used to populate the PV which is then rebound to the target PVC
355
        AnnPVCPrimeName = AnnAPIGroup + "/storage.populator.pvcPrime"
356
)
357

358
// Size-detection pod error codes
359
const (
360
        NoErr int = iota
361
        ErrBadArguments
362
        ErrInvalidFile
363
        ErrInvalidPath
364
        ErrBadTermFile
365
        ErrUnknown
366
)
367

368
var (
369
        // BlockMode is raw block device mode
370
        BlockMode = corev1.PersistentVolumeBlock
371
        // FilesystemMode is filesystem device mode
372
        FilesystemMode = corev1.PersistentVolumeFilesystem
373

374
        // DefaultInstanceTypeLabels is a list of currently supported default instance type labels
375
        DefaultInstanceTypeLabels = []string{
376
                LabelDefaultInstancetype,
377
                LabelDefaultInstancetypeKind,
378
                LabelDefaultPreference,
379
                LabelDefaultPreferenceKind,
380
        }
381

382
        apiServerKeyOnce sync.Once
383
        apiServerKey     *rsa.PrivateKey
384

385
        // allowedAnnotations is a list of annotations
386
        // that can be propagated from the pvc/dv to a pod
387
        allowedAnnotations = map[string]string{
388
                AnnPodNetwork:                 "",
389
                AnnPodSidecarInjectionIstio:   AnnPodSidecarInjectionIstioDefault,
390
                AnnPodSidecarInjectionLinkerd: AnnPodSidecarInjectionLinkerdDefault,
391
                AnnPriorityClassName:          "",
392
                AnnPodMultusDefaultNetwork:    "",
393
        }
394

395
        validLabelsMatch = regexp.MustCompile(`^([\w.]+\.kubevirt.io|kubevirt.io)/[\w-]+$`)
396

397
        ErrDataSourceMaxDepthReached = errors.New("DataSource reference chain exceeds maximum depth of 1")
398
        ErrDataSourceSelfReference   = errors.New("DataSource cannot self-reference")
399
)
400

401
// FakeValidator is a fake token validator
402
type FakeValidator struct {
403
        Match     string
404
        Operation token.Operation
405
        Name      string
406
        Namespace string
407
        Resource  metav1.GroupVersionResource
408
        Params    map[string]string
409
}
410

411
// Validate is a fake token validation
412
func (v *FakeValidator) Validate(value string) (*token.Payload, error) {
×
413
        if value != v.Match {
×
414
                return nil, fmt.Errorf("token does not match expected")
×
415
        }
×
416
        resource := metav1.GroupVersionResource{
×
417
                Resource: "persistentvolumeclaims",
×
418
        }
×
419
        return &token.Payload{
×
420
                Name:      v.Name,
×
421
                Namespace: v.Namespace,
×
422
                Operation: token.OperationClone,
×
423
                Resource:  resource,
×
424
                Params:    v.Params,
×
425
        }, nil
×
426
}
427

428
// MultiTokenValidator is a token validator that can validate both short and long tokens
429
type MultiTokenValidator struct {
430
        ShortTokenValidator token.Validator
431
        LongTokenValidator  token.Validator
432
}
433

434
// ValidatePVC validates a PVC
435
func (mtv *MultiTokenValidator) ValidatePVC(source, target *corev1.PersistentVolumeClaim) error {
×
436
        tok, v := mtv.getTokenAndValidator(target)
×
437
        return ValidateCloneTokenPVC(tok, v, source, target)
×
438
}
×
439

440
// ValidatePopulator valades a token for a populator
441
func (mtv *MultiTokenValidator) ValidatePopulator(vcs *cdiv1.VolumeCloneSource, pvc *corev1.PersistentVolumeClaim) error {
×
442
        if vcs.Namespace == pvc.Namespace {
×
443
                return nil
×
444
        }
×
445

446
        tok, v := mtv.getTokenAndValidator(pvc)
×
447

×
448
        tokenData, err := v.Validate(tok)
×
449
        if err != nil {
×
450
                return errors.Wrap(err, "error verifying token")
×
451
        }
×
452

453
        var tokenResourceName string
×
454
        switch vcs.Spec.Source.Kind {
×
455
        case "PersistentVolumeClaim":
×
456
                tokenResourceName = "persistentvolumeclaims"
×
457
        case "VolumeSnapshot":
×
458
                tokenResourceName = "volumesnapshots"
×
459
        }
460
        srcName := vcs.Spec.Source.Name
×
461

×
462
        return validateTokenData(tokenData, vcs.Namespace, srcName, pvc.Namespace, pvc.Name, string(pvc.UID), tokenResourceName)
×
463
}
464

465
func (mtv *MultiTokenValidator) getTokenAndValidator(pvc *corev1.PersistentVolumeClaim) (string, token.Validator) {
×
466
        v := mtv.LongTokenValidator
×
467
        tok, ok := pvc.Annotations[AnnExtendedCloneToken]
×
468
        if !ok {
×
469
                // if token doesn't exist, no prob for same namespace
×
470
                tok = pvc.Annotations[AnnCloneToken]
×
471
                v = mtv.ShortTokenValidator
×
472
        }
×
473
        return tok, v
×
474
}
475

476
// NewMultiTokenValidator returns a new multi token validator
477
func NewMultiTokenValidator(key *rsa.PublicKey) *MultiTokenValidator {
×
478
        return &MultiTokenValidator{
×
479
                ShortTokenValidator: NewCloneTokenValidator(common.CloneTokenIssuer, key),
×
480
                LongTokenValidator:  NewCloneTokenValidator(common.ExtendedCloneTokenIssuer, key),
×
481
        }
×
482
}
×
483

484
// NewCloneTokenValidator returns a new token validator
485
func NewCloneTokenValidator(issuer string, key *rsa.PublicKey) token.Validator {
×
486
        return token.NewValidator(issuer, key, cloneTokenLeeway)
×
487
}
×
488

489
// GetRequestedImageSize returns the PVC requested size
490
func GetRequestedImageSize(pvc *corev1.PersistentVolumeClaim) (string, error) {
1✔
491
        pvcSize, found := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
492
        if !found {
2✔
493
                return "", errors.Errorf("storage request is missing in pvc \"%s/%s\"", pvc.Namespace, pvc.Name)
1✔
494
        }
1✔
495
        return pvcSize.String(), nil
1✔
496
}
497

498
// GetVolumeMode returns the volumeMode from PVC handling default empty value
499
func GetVolumeMode(pvc *corev1.PersistentVolumeClaim) corev1.PersistentVolumeMode {
×
500
        return util.ResolveVolumeMode(pvc.Spec.VolumeMode)
×
501
}
×
502

503
// IsDataVolumeUsingDefaultStorageClass checks if the DataVolume is using the default StorageClass
504
func IsDataVolumeUsingDefaultStorageClass(dv *cdiv1.DataVolume) bool {
×
505
        return GetStorageClassFromDVSpec(dv) == nil
×
506
}
×
507

508
// GetStorageClassFromDVSpec returns the StorageClassName from DataVolume PVC or Storage spec
509
func GetStorageClassFromDVSpec(dv *cdiv1.DataVolume) *string {
×
510
        if dv.Spec.PVC != nil {
×
511
                return dv.Spec.PVC.StorageClassName
×
512
        }
×
513

514
        if dv.Spec.Storage != nil {
×
515
                return dv.Spec.Storage.StorageClassName
×
516
        }
×
517

518
        return nil
×
519
}
520

521
// getStorageClassByName looks up the storage class based on the name.
522
// If name is nil, it performs fallback to default according to the provided content type
523
// If no storage class is found, returns nil
524
func getStorageClassByName(ctx context.Context, client client.Client, name *string, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
1✔
525
        if name == nil {
2✔
526
                return getFallbackStorageClass(ctx, client, contentType)
1✔
527
        }
1✔
528

529
        // look up storage class by name
530
        storageClass := &storagev1.StorageClass{}
×
531
        if err := client.Get(ctx, types.NamespacedName{Name: *name}, storageClass); err != nil {
×
532
                if k8serrors.IsNotFound(err) {
×
533
                        return nil, nil
×
534
                }
×
535
                klog.V(3).Info("Unable to retrieve storage class", "storage class name", *name)
×
536
                return nil, errors.Errorf("unable to retrieve storage class %s", *name)
×
537
        }
538

539
        return storageClass, nil
×
540
}
541

542
// GetStorageClassByNameWithK8sFallback looks up the storage class based on the name
543
// If name is nil, it looks for the default k8s storage class storageclass.kubernetes.io/is-default-class
544
// If no storage class is found, returns nil
545
func GetStorageClassByNameWithK8sFallback(ctx context.Context, client client.Client, name *string) (*storagev1.StorageClass, error) {
1✔
546
        return getStorageClassByName(ctx, client, name, cdiv1.DataVolumeArchive)
1✔
547
}
1✔
548

549
// GetStorageClassByNameWithVirtFallback looks up the storage class based on the name
550
// If name is nil, it looks for the following, in this order:
551
// default kubevirt storage class (if the caller is interested) storageclass.kubevirt.io/is-default-class
552
// default k8s storage class storageclass.kubernetes.io/is-default-class
553
// If no storage class is found, returns nil
554
func GetStorageClassByNameWithVirtFallback(ctx context.Context, client client.Client, name *string, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
1✔
555
        return getStorageClassByName(ctx, client, name, contentType)
1✔
556
}
1✔
557

558
// getFallbackStorageClass looks for a default virt/k8s storage class according to the content type
559
// If no storage class is found, returns nil
560
func getFallbackStorageClass(ctx context.Context, client client.Client, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
1✔
561
        storageClasses := &storagev1.StorageClassList{}
1✔
562
        if err := client.List(ctx, storageClasses); err != nil {
1✔
563
                klog.V(3).Info("Unable to retrieve available storage classes")
×
564
                return nil, errors.New("unable to retrieve storage classes")
×
565
        }
×
566

567
        if GetContentType(contentType) == cdiv1.DataVolumeKubeVirt {
2✔
568
                if virtSc := GetPlatformDefaultStorageClass(storageClasses, AnnDefaultVirtStorageClass); virtSc != nil {
2✔
569
                        return virtSc, nil
1✔
570
                }
1✔
571
        }
572
        return GetPlatformDefaultStorageClass(storageClasses, AnnDefaultStorageClass), nil
1✔
573
}
574

575
// GetPlatformDefaultStorageClass returns the default storage class according to the provided annotation or nil if none found
576
func GetPlatformDefaultStorageClass(storageClasses *storagev1.StorageClassList, defaultAnnotationKey string) *storagev1.StorageClass {
1✔
577
        defaultClasses := []storagev1.StorageClass{}
1✔
578

1✔
579
        for _, storageClass := range storageClasses.Items {
2✔
580
                if storageClass.Annotations[defaultAnnotationKey] == "true" {
2✔
581
                        defaultClasses = append(defaultClasses, storageClass)
1✔
582
                }
1✔
583
        }
584

585
        if len(defaultClasses) == 0 {
2✔
586
                return nil
1✔
587
        }
1✔
588

589
        // Primary sort by creation timestamp, newest first
590
        // Secondary sort by class name, ascending order
591
        // Follows k8s behavior
592
        // https://github.com/kubernetes/kubernetes/blob/731068288e112c8b5af70f676296cc44661e84f4/pkg/volume/util/storageclass.go#L58-L59
593
        sort.Slice(defaultClasses, func(i, j int) bool {
2✔
594
                if defaultClasses[i].CreationTimestamp.UnixNano() == defaultClasses[j].CreationTimestamp.UnixNano() {
2✔
595
                        return defaultClasses[i].Name < defaultClasses[j].Name
1✔
596
                }
1✔
597
                return defaultClasses[i].CreationTimestamp.UnixNano() > defaultClasses[j].CreationTimestamp.UnixNano()
1✔
598
        })
599
        if len(defaultClasses) > 1 {
2✔
600
                klog.V(3).Infof("%d default StorageClasses were found, choosing: %s", len(defaultClasses), defaultClasses[0].Name)
1✔
601
        }
1✔
602

603
        return &defaultClasses[0]
1✔
604
}
605

606
// GetFilesystemOverheadForStorageClass determines the filesystem overhead defined in CDIConfig for the storageClass.
607
func GetFilesystemOverheadForStorageClass(ctx context.Context, client client.Client, storageClassName *string) (cdiv1.Percent, error) {
×
608
        if storageClassName != nil && *storageClassName == "" {
×
609
                klog.V(3).Info("No storage class name passed")
×
610
                return "0", nil
×
611
        }
×
612

613
        cdiConfig := &cdiv1.CDIConfig{}
×
614
        if err := client.Get(ctx, types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
×
615
                if k8serrors.IsNotFound(err) {
×
616
                        klog.V(1).Info("CDIConfig does not exist, pod will not start until it does")
×
617
                        return "0", nil
×
618
                }
×
619
                return "0", err
×
620
        }
621

622
        targetStorageClass, err := GetStorageClassByNameWithK8sFallback(ctx, client, storageClassName)
×
623
        if err != nil || targetStorageClass == nil {
×
624
                klog.V(3).Info("Storage class", storageClassName, "not found, trying default storage class")
×
625
                targetStorageClass, err = GetStorageClassByNameWithK8sFallback(ctx, client, nil)
×
626
                if err != nil {
×
627
                        klog.V(3).Info("No default storage class found, continuing with global overhead")
×
628
                        return cdiConfig.Status.FilesystemOverhead.Global, nil
×
629
                }
×
630
        }
631

632
        if cdiConfig.Status.FilesystemOverhead == nil {
×
633
                klog.Errorf("CDIConfig filesystemOverhead used before config controller ran reconcile. Hopefully this only happens during unit testing.")
×
634
                return "0", nil
×
635
        }
×
636

637
        if targetStorageClass == nil {
×
638
                klog.V(3).Info("Storage class", storageClassName, "not found, continuing with global overhead")
×
639
                return cdiConfig.Status.FilesystemOverhead.Global, nil
×
640
        }
×
641

642
        klog.V(3).Info("target storage class for overhead", targetStorageClass.GetName())
×
643

×
644
        perStorageConfig := cdiConfig.Status.FilesystemOverhead.StorageClass
×
645

×
646
        storageClassOverhead, found := perStorageConfig[targetStorageClass.GetName()]
×
647
        if found {
×
648
                return storageClassOverhead, nil
×
649
        }
×
650

651
        return cdiConfig.Status.FilesystemOverhead.Global, nil
×
652
}
653

654
// GetDefaultPodResourceRequirements gets default pod resource requirements from cdi config status
655
func GetDefaultPodResourceRequirements(client client.Client) (*corev1.ResourceRequirements, error) {
×
656
        cdiconfig := &cdiv1.CDIConfig{}
×
657
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
×
658
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
659
                return nil, err
×
660
        }
×
661

662
        return cdiconfig.Status.DefaultPodResourceRequirements, nil
×
663
}
664

665
// GetImagePullSecrets gets the imagePullSecrets needed to pull images from the cdi config
666
func GetImagePullSecrets(client client.Client) ([]corev1.LocalObjectReference, error) {
×
667
        cdiconfig := &cdiv1.CDIConfig{}
×
668
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
×
669
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
670
                return nil, err
×
671
        }
×
672

673
        return cdiconfig.Status.ImagePullSecrets, nil
×
674
}
675

676
// GetPodFromPvc determines the pod associated with the pvc passed in.
677
func GetPodFromPvc(c client.Client, namespace string, pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
×
678
        l, _ := labels.Parse(common.PrometheusLabelKey)
×
679
        pods := &corev1.PodList{}
×
680
        listOptions := client.ListOptions{
×
681
                LabelSelector: l,
×
682
        }
×
683
        if err := c.List(context.TODO(), pods, &listOptions); err != nil {
×
684
                return nil, err
×
685
        }
×
686

687
        pvcUID := pvc.GetUID()
×
688
        for _, pod := range pods.Items {
×
689
                if ShouldIgnorePod(&pod, pvc) {
×
690
                        continue
×
691
                }
692
                for _, or := range pod.OwnerReferences {
×
693
                        if or.UID == pvcUID {
×
694
                                return &pod, nil
×
695
                        }
×
696
                }
697

698
                // TODO: check this
699
                val, exists := pod.Labels[CloneUniqueID]
×
700
                if exists && val == string(pvcUID)+common.ClonerSourcePodNameSuffix {
×
701
                        return &pod, nil
×
702
                }
×
703
        }
704
        return nil, errors.Errorf("Unable to find pod owned by UID: %s, in namespace: %s", string(pvcUID), namespace)
×
705
}
706

707
// AddVolumeDevices returns VolumeDevice slice with one block device for pods using PV with block volume mode
708
func AddVolumeDevices() []corev1.VolumeDevice {
×
709
        volumeDevices := []corev1.VolumeDevice{
×
710
                {
×
711
                        Name:       DataVolName,
×
712
                        DevicePath: common.WriteBlockPath,
×
713
                },
×
714
        }
×
715
        return volumeDevices
×
716
}
×
717

718
// GetPodsUsingPVCs returns Pods currently using PVCs
719
func GetPodsUsingPVCs(ctx context.Context, c client.Client, namespace string, names sets.Set[string], allowReadOnly bool) ([]corev1.Pod, error) {
×
720
        pl := &corev1.PodList{}
×
721
        // hopefully using cached client here
×
722
        err := c.List(ctx, pl, &client.ListOptions{Namespace: namespace})
×
723
        if err != nil {
×
724
                return nil, err
×
725
        }
×
726

727
        var pods []corev1.Pod
×
728
        for _, pod := range pl.Items {
×
729
                if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
×
730
                        continue
×
731
                }
732
                for _, volume := range pod.Spec.Volumes {
×
733
                        if volume.VolumeSource.PersistentVolumeClaim != nil &&
×
734
                                names.Has(volume.PersistentVolumeClaim.ClaimName) {
×
735
                                addPod := true
×
736
                                if allowReadOnly {
×
737
                                        if !volume.VolumeSource.PersistentVolumeClaim.ReadOnly {
×
738
                                                onlyReadOnly := true
×
739
                                                for _, c := range pod.Spec.Containers {
×
740
                                                        for _, vm := range c.VolumeMounts {
×
741
                                                                if vm.Name == volume.Name && !vm.ReadOnly {
×
742
                                                                        onlyReadOnly = false
×
743
                                                                }
×
744
                                                        }
745
                                                        for _, vm := range c.VolumeDevices {
×
746
                                                                if vm.Name == volume.Name {
×
747
                                                                        // Node level rw mount and container can't mount block device ro
×
748
                                                                        onlyReadOnly = false
×
749
                                                                }
×
750
                                                        }
751
                                                }
752
                                                if onlyReadOnly {
×
753
                                                        // no rw mounts
×
754
                                                        addPod = false
×
755
                                                }
×
756
                                        } else {
×
757
                                                // all mounts must be ro
×
758
                                                addPod = false
×
759
                                        }
×
760
                                        if strings.HasSuffix(pod.Name, common.ClonerSourcePodNameSuffix) && pod.Labels != nil &&
×
761
                                                pod.Labels[common.CDIComponentLabel] == common.ClonerSourcePodName {
×
762
                                                // Host assisted clone source pod only reads from source
×
763
                                                // But some drivers disallow mounting a block PVC ReadOnly
×
764
                                                addPod = false
×
765
                                        }
×
766
                                }
767
                                if addPod {
×
768
                                        pods = append(pods, pod)
×
769
                                        break
×
770
                                }
771
                        }
772
                }
773
        }
774

775
        return pods, nil
×
776
}
777

778
// GetWorkloadNodePlacement extracts the workload-specific nodeplacement values from the CDI CR
779
func GetWorkloadNodePlacement(ctx context.Context, c client.Client) (*sdkapi.NodePlacement, error) {
×
780
        cr, err := GetActiveCDI(ctx, c)
×
781
        if err != nil {
×
782
                return nil, err
×
783
        }
×
784

785
        if cr == nil {
×
786
                return nil, fmt.Errorf("no active CDI")
×
787
        }
×
788

789
        return &cr.Spec.Workloads, nil
×
790
}
791

792
// GetActiveCDI returns the active CDI CR
793
func GetActiveCDI(ctx context.Context, c client.Client) (*cdiv1.CDI, error) {
1✔
794
        crList := &cdiv1.CDIList{}
1✔
795
        if err := c.List(ctx, crList, &client.ListOptions{}); err != nil {
1✔
796
                return nil, err
×
797
        }
×
798

799
        if len(crList.Items) == 0 {
2✔
800
                return nil, nil
1✔
801
        }
1✔
802

803
        if len(crList.Items) == 1 {
2✔
804
                return &crList.Items[0], nil
1✔
805
        }
1✔
806

807
        var activeResources []cdiv1.CDI
1✔
808
        for _, cr := range crList.Items {
2✔
809
                if cr.Status.Phase != sdkapi.PhaseError {
2✔
810
                        activeResources = append(activeResources, cr)
1✔
811
                }
1✔
812
        }
813

814
        if len(activeResources) != 1 {
2✔
815
                return nil, fmt.Errorf("invalid number of active CDI resources: %d", len(activeResources))
1✔
816
        }
1✔
817

818
        return &activeResources[0], nil
1✔
819
}
820

821
// IsPopulated returns if the passed in PVC has been populated according to the rules outlined in pkg/apis/core/<version>/utils.go
822
func IsPopulated(pvc *corev1.PersistentVolumeClaim, c client.Client) (bool, error) {
×
823
        return cdiv1utils.IsPopulated(pvc, func(name, namespace string) (*cdiv1.DataVolume, error) {
×
824
                dv := &cdiv1.DataVolume{}
×
825
                err := c.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, dv)
×
826
                return dv, err
×
827
        })
×
828
}
829

830
// GetPreallocation returns the preallocation setting for the specified object (DV or VolumeImportSource), falling back to StorageClass and global setting (in this order)
831
func GetPreallocation(ctx context.Context, client client.Client, preallocation *bool) bool {
×
832
        // First, the DV's preallocation
×
833
        if preallocation != nil {
×
834
                return *preallocation
×
835
        }
×
836

837
        cdiconfig := &cdiv1.CDIConfig{}
×
838
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
×
839
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
840
                return defaultPreallocation
×
841
        }
×
842

843
        return cdiconfig.Status.Preallocation
×
844
}
845

846
// ImmediateBindingRequested returns if an object has the ImmediateBinding annotation
847
func ImmediateBindingRequested(obj metav1.Object) bool {
×
848
        _, isImmediateBindingRequested := obj.GetAnnotations()[AnnImmediateBinding]
×
849
        return isImmediateBindingRequested
×
850
}
×
851

852
// GetPriorityClass gets PVC priority class
853
func GetPriorityClass(pvc *corev1.PersistentVolumeClaim) string {
×
854
        anno := pvc.GetAnnotations()
×
855
        return anno[AnnPriorityClassName]
×
856
}
×
857

858
// ShouldDeletePod returns whether the PVC workload pod should be deleted
859
func ShouldDeletePod(pvc *corev1.PersistentVolumeClaim) bool {
×
860
        return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.GetAnnotations()[AnnRequiresDirectIO] == "true" || pvc.DeletionTimestamp != nil
×
861
}
×
862

863
// AddFinalizer adds a finalizer to a resource
864
func AddFinalizer(obj metav1.Object, name string) {
×
865
        if HasFinalizer(obj, name) {
×
866
                return
×
867
        }
×
868

869
        obj.SetFinalizers(append(obj.GetFinalizers(), name))
×
870
}
871

872
// RemoveFinalizer removes a finalizer from a resource
873
func RemoveFinalizer(obj metav1.Object, name string) {
×
874
        if !HasFinalizer(obj, name) {
×
875
                return
×
876
        }
×
877

878
        var finalizers []string
×
879
        for _, f := range obj.GetFinalizers() {
×
880
                if f != name {
×
881
                        finalizers = append(finalizers, f)
×
882
                }
×
883
        }
884

885
        obj.SetFinalizers(finalizers)
×
886
}
887

888
// HasFinalizer returns true if a resource has a specific finalizer
889
func HasFinalizer(object metav1.Object, value string) bool {
×
890
        for _, f := range object.GetFinalizers() {
×
891
                if f == value {
×
892
                        return true
×
893
                }
×
894
        }
895
        return false
×
896
}
897

898
// ValidateCloneTokenPVC validates clone token for source and target PVCs
899
func ValidateCloneTokenPVC(t string, v token.Validator, source, target *corev1.PersistentVolumeClaim) error {
×
900
        if source.Namespace == target.Namespace {
×
901
                return nil
×
902
        }
×
903

904
        tokenData, err := v.Validate(t)
×
905
        if err != nil {
×
906
                return errors.Wrap(err, "error verifying token")
×
907
        }
×
908

909
        tokenResourceName := getTokenResourceNamePvc(source)
×
910
        srcName := getSourceNamePvc(source)
×
911

×
912
        return validateTokenData(tokenData, source.Namespace, srcName, target.Namespace, target.Name, string(target.UID), tokenResourceName)
×
913
}
914

915
// ValidateCloneTokenDV validates clone token for DV
916
func ValidateCloneTokenDV(validator token.Validator, dv *cdiv1.DataVolume) error {
×
917
        _, sourceName, sourceNamespace := GetCloneSourceInfo(dv)
×
918
        if sourceNamespace == "" || sourceNamespace == dv.Namespace {
×
919
                return nil
×
920
        }
×
921

922
        tok, ok := dv.Annotations[AnnCloneToken]
×
923
        if !ok {
×
924
                return errors.New("clone token missing")
×
925
        }
×
926

927
        tokenData, err := validator.Validate(tok)
×
928
        if err != nil {
×
929
                return errors.Wrap(err, "error verifying token")
×
930
        }
×
931

932
        tokenResourceName := getTokenResourceNameDataVolume(dv.Spec.Source)
×
933
        if tokenResourceName == "" {
×
934
                return errors.New("token resource name empty, can't verify properly")
×
935
        }
×
936

937
        return validateTokenData(tokenData, sourceNamespace, sourceName, dv.Namespace, dv.Name, "", tokenResourceName)
×
938
}
939

940
func getTokenResourceNameDataVolume(source *cdiv1.DataVolumeSource) string {
×
941
        if source.PVC != nil {
×
942
                return "persistentvolumeclaims"
×
943
        } else if source.Snapshot != nil {
×
944
                return "volumesnapshots"
×
945
        }
×
946

947
        return ""
×
948
}
949

950
func getTokenResourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
×
951
        if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
×
952
                return "volumesnapshots"
×
953
        }
×
954

955
        return "persistentvolumeclaims"
×
956
}
957

958
func getSourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
×
959
        if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
×
960
                if sourcePvc.Spec.DataSourceRef != nil {
×
961
                        return sourcePvc.Spec.DataSourceRef.Name
×
962
                }
×
963
        }
964

965
        return sourcePvc.Name
×
966
}
967

968
func validateTokenData(tokenData *token.Payload, srcNamespace, srcName, targetNamespace, targetName, targetUID, tokenResourceName string) error {
×
969
        uid := tokenData.Params["uid"]
×
970
        if tokenData.Operation != token.OperationClone ||
×
971
                tokenData.Name != srcName ||
×
972
                tokenData.Namespace != srcNamespace ||
×
973
                tokenData.Resource.Resource != tokenResourceName ||
×
974
                tokenData.Params["targetNamespace"] != targetNamespace ||
×
975
                tokenData.Params["targetName"] != targetName ||
×
976
                (uid != "" && uid != targetUID) {
×
977
                return errors.New("invalid token")
×
978
        }
×
979

980
        return nil
×
981
}
982

983
// IsSnapshotValidForClone returns an error if the passed snapshot is not valid for cloning
984
func IsSnapshotValidForClone(sourceSnapshot *snapshotv1.VolumeSnapshot) error {
×
985
        if sourceSnapshot.Status == nil {
×
986
                return fmt.Errorf("no status on source snapshot yet")
×
987
        }
×
988
        if !IsSnapshotReady(sourceSnapshot) {
×
989
                klog.V(3).Info("snapshot not ReadyToUse, while we allow this, probably going to be an issue going forward", "namespace", sourceSnapshot.Namespace, "name", sourceSnapshot.Name)
×
990
        }
×
991
        if sourceSnapshot.Status.Error != nil {
×
992
                errMessage := "no details"
×
993
                if msg := sourceSnapshot.Status.Error.Message; msg != nil {
×
994
                        errMessage = *msg
×
995
                }
×
996
                return fmt.Errorf("snapshot in error state with msg: %s", errMessage)
×
997
        }
998
        if sourceSnapshot.Spec.VolumeSnapshotClassName == nil ||
×
999
                *sourceSnapshot.Spec.VolumeSnapshotClassName == "" {
×
1000
                return fmt.Errorf("snapshot %s/%s does not have volume snap class populated, can't clone", sourceSnapshot.Name, sourceSnapshot.Namespace)
×
1001
        }
×
1002
        return nil
×
1003
}
1004

1005
// AddAnnotation adds an annotation to an object
1006
func AddAnnotation(obj metav1.Object, key, value string) {
1✔
1007
        if obj.GetAnnotations() == nil {
2✔
1008
                obj.SetAnnotations(make(map[string]string))
1✔
1009
        }
1✔
1010
        obj.GetAnnotations()[key] = value
1✔
1011
}
1012

1013
// AddLabel adds a label to an object
1014
func AddLabel(obj metav1.Object, key, value string) {
1✔
1015
        if obj.GetLabels() == nil {
2✔
1016
                obj.SetLabels(make(map[string]string))
1✔
1017
        }
1✔
1018
        obj.GetLabels()[key] = value
1✔
1019
}
1020

1021
// HandleFailedPod handles pod-creation errors and updates the pod's PVC without providing sensitive information
1022
func HandleFailedPod(err error, podName string, pvc *corev1.PersistentVolumeClaim, recorder record.EventRecorder, c client.Client) error {
×
1023
        if err == nil {
×
1024
                return nil
×
1025
        }
×
1026
        // Generic reason and msg to avoid providing sensitive information
1027
        reason := ErrStartingPod
×
1028
        msg := fmt.Sprintf(MessageErrStartingPod, podName)
×
1029

×
1030
        // Error handling to fine-tune the event with pertinent info
×
1031
        if ErrQuotaExceeded(err) {
×
1032
                reason = ErrExceededQuota
×
1033
        }
×
1034

1035
        recorder.Event(pvc, corev1.EventTypeWarning, reason, msg)
×
1036

×
1037
        if isCloneSourcePod := CreateCloneSourcePodName(pvc) == podName; isCloneSourcePod {
×
1038
                AddAnnotation(pvc, AnnSourceRunningCondition, "false")
×
1039
                AddAnnotation(pvc, AnnSourceRunningConditionReason, reason)
×
1040
                AddAnnotation(pvc, AnnSourceRunningConditionMessage, msg)
×
1041
        } else {
×
1042
                AddAnnotation(pvc, AnnRunningCondition, "false")
×
1043
                AddAnnotation(pvc, AnnRunningConditionReason, reason)
×
1044
                AddAnnotation(pvc, AnnRunningConditionMessage, msg)
×
1045
        }
×
1046

1047
        AddAnnotation(pvc, AnnPodPhase, string(corev1.PodFailed))
×
1048
        if err := c.Update(context.TODO(), pvc); err != nil {
×
1049
                return err
×
1050
        }
×
1051

1052
        return err
×
1053
}
1054

1055
// GetSource returns the source string which determines the type of source. If no source or invalid source found, default to http
1056
func GetSource(pvc *corev1.PersistentVolumeClaim) string {
×
1057
        source, found := pvc.Annotations[AnnSource]
×
1058
        if !found {
×
1059
                source = ""
×
1060
        }
×
1061
        switch source {
×
1062
        case
1063
                SourceHTTP,
1064
                SourceS3,
1065
                SourceGCS,
1066
                SourceGlance,
1067
                SourceNone,
1068
                SourceRegistry,
1069
                SourceImageio,
1070
                SourceVDDK:
×
1071
        default:
×
1072
                source = SourceHTTP
×
1073
        }
1074
        return source
×
1075
}
1076

1077
// GetEndpoint returns the endpoint string which contains the full path URI of the target object to be copied.
1078
func GetEndpoint(pvc *corev1.PersistentVolumeClaim) (string, error) {
×
1079
        ep, found := pvc.Annotations[AnnEndpoint]
×
1080
        if !found || ep == "" {
×
1081
                verb := "empty"
×
1082
                if !found {
×
1083
                        verb = "missing"
×
1084
                }
×
1085
                return ep, errors.Errorf("annotation %q in pvc \"%s/%s\" is %s", AnnEndpoint, pvc.Namespace, pvc.Name, verb)
×
1086
        }
1087
        return ep, nil
×
1088
}
1089

1090
// AddImportVolumeMounts is being called for pods using PV with filesystem volume mode
1091
func AddImportVolumeMounts() []corev1.VolumeMount {
×
1092
        volumeMounts := []corev1.VolumeMount{
×
1093
                {
×
1094
                        Name:      DataVolName,
×
1095
                        MountPath: common.ImporterDataDir,
×
1096
                },
×
1097
        }
×
1098
        return volumeMounts
×
1099
}
×
1100

1101
// ValidateRequestedCloneSize validates the clone size requirements on block
1102
func ValidateRequestedCloneSize(sourceResources, targetResources corev1.VolumeResourceRequirements) error {
×
1103
        sourceRequest, hasSource := sourceResources.Requests[corev1.ResourceStorage]
×
1104
        targetRequest, hasTarget := targetResources.Requests[corev1.ResourceStorage]
×
1105
        if !hasSource || !hasTarget {
×
1106
                return errors.New("source/target missing storage resource requests")
×
1107
        }
×
1108

1109
        // Verify that the target PVC size is equal or larger than the source.
1110
        if sourceRequest.Value() > targetRequest.Value() {
×
1111
                return errors.Errorf("target resources requests storage size is smaller than the source %d < %d", targetRequest.Value(), sourceRequest.Value())
×
1112
        }
×
1113
        return nil
×
1114
}
1115

1116
// CreateCloneSourcePodName creates clone source pod name
1117
func CreateCloneSourcePodName(targetPvc *corev1.PersistentVolumeClaim) string {
×
1118
        return string(targetPvc.GetUID()) + common.ClonerSourcePodNameSuffix
×
1119
}
×
1120

1121
// IsPVCComplete returns true if a PVC is in 'Succeeded' phase, false if not
1122
func IsPVCComplete(pvc *corev1.PersistentVolumeClaim) bool {
×
1123
        if pvc != nil {
×
1124
                phase, exists := pvc.ObjectMeta.Annotations[AnnPodPhase]
×
1125
                return exists && (phase == string(corev1.PodSucceeded))
×
1126
        }
×
1127
        return false
×
1128
}
1129

1130
// IsMultiStageImportInProgress returns true when a PVC is being part of an ongoing multi-stage import
1131
func IsMultiStageImportInProgress(pvc *corev1.PersistentVolumeClaim) bool {
×
1132
        if pvc != nil {
×
1133
                multiStageImport := metav1.HasAnnotation(pvc.ObjectMeta, AnnCurrentCheckpoint)
×
1134
                multiStageAlreadyDone := metav1.HasAnnotation(pvc.ObjectMeta, AnnMultiStageImportDone)
×
1135
                return multiStageImport && !multiStageAlreadyDone
×
1136
        }
×
1137
        return false
×
1138
}
1139

1140
// SetRestrictedSecurityContext sets the pod security params to be compatible with restricted PSA
1141
func SetRestrictedSecurityContext(podSpec *corev1.PodSpec) {
×
1142
        hasVolumeMounts := false
×
1143
        for _, containers := range [][]corev1.Container{podSpec.InitContainers, podSpec.Containers} {
×
1144
                for i := range containers {
×
1145
                        container := &containers[i]
×
1146
                        if container.SecurityContext == nil {
×
1147
                                container.SecurityContext = &corev1.SecurityContext{}
×
1148
                        }
×
1149
                        container.SecurityContext.Capabilities = &corev1.Capabilities{
×
1150
                                Drop: []corev1.Capability{
×
1151
                                        "ALL",
×
1152
                                },
×
1153
                        }
×
1154
                        container.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
×
1155
                                Type: corev1.SeccompProfileTypeRuntimeDefault,
×
1156
                        }
×
1157
                        container.SecurityContext.AllowPrivilegeEscalation = ptr.To[bool](false)
×
1158
                        container.SecurityContext.RunAsNonRoot = ptr.To[bool](true)
×
1159
                        container.SecurityContext.RunAsUser = ptr.To[int64](common.QemuSubGid)
×
1160
                        if len(container.VolumeMounts) > 0 {
×
1161
                                hasVolumeMounts = true
×
1162
                        }
×
1163
                }
1164
        }
1165

1166
        if podSpec.SecurityContext == nil {
×
1167
                podSpec.SecurityContext = &corev1.PodSecurityContext{}
×
1168
        }
×
1169
        // Some tools like istio inject containers and thus rely on a pod level seccomp profile being specified
1170
        podSpec.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
×
1171
                Type: corev1.SeccompProfileTypeRuntimeDefault,
×
1172
        }
×
1173
        if hasVolumeMounts {
×
1174
                podSpec.SecurityContext.FSGroup = ptr.To[int64](common.QemuSubGid)
×
1175
        }
×
1176
}
1177

1178
// SetNodeNameIfPopulator sets NodeName in a pod spec when the PVC is being handled by a CDI volume populator
1179
func SetNodeNameIfPopulator(pvc *corev1.PersistentVolumeClaim, podSpec *corev1.PodSpec) {
×
1180
        _, isPopulator := pvc.Annotations[AnnPopulatorKind]
×
1181
        nodeName := pvc.Annotations[AnnSelectedNode]
×
1182
        if isPopulator && nodeName != "" {
×
1183
                podSpec.NodeName = nodeName
×
1184
        }
×
1185
}
1186

1187
// CreatePvc creates PVC
1188
func CreatePvc(name, ns string, annotations, labels map[string]string) *corev1.PersistentVolumeClaim {
1✔
1189
        return CreatePvcInStorageClass(name, ns, nil, annotations, labels, corev1.ClaimBound)
1✔
1190
}
1✔
1191

1192
// CreatePvcInStorageClass creates PVC with storgae class
1193
func CreatePvcInStorageClass(name, ns string, storageClassName *string, annotations, labels map[string]string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim {
1✔
1194
        pvc := &corev1.PersistentVolumeClaim{
1✔
1195
                ObjectMeta: metav1.ObjectMeta{
1✔
1196
                        Name:        name,
1✔
1197
                        Namespace:   ns,
1✔
1198
                        Annotations: annotations,
1✔
1199
                        Labels:      labels,
1✔
1200
                        UID:         types.UID(ns + "-" + name),
1✔
1201
                },
1✔
1202
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1203
                        AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadOnlyMany, corev1.ReadWriteOnce},
1✔
1204
                        Resources: corev1.VolumeResourceRequirements{
1✔
1205
                                Requests: corev1.ResourceList{
1✔
1206
                                        corev1.ResourceStorage: resource.MustParse("1G"),
1✔
1207
                                },
1✔
1208
                        },
1✔
1209
                        StorageClassName: storageClassName,
1✔
1210
                },
1✔
1211
                Status: corev1.PersistentVolumeClaimStatus{
1✔
1212
                        Phase: phase,
1✔
1213
                },
1✔
1214
        }
1✔
1215
        pvc.Status.Capacity = pvc.Spec.Resources.Requests.DeepCopy()
1✔
1216
        if pvc.Status.Phase == corev1.ClaimBound {
2✔
1217
                pvc.Spec.VolumeName = "pv-" + string(pvc.UID)
1✔
1218
        }
1✔
1219
        return pvc
1✔
1220
}
1221

1222
// GetAPIServerKey returns API server RSA key
1223
func GetAPIServerKey() *rsa.PrivateKey {
×
1224
        apiServerKeyOnce.Do(func() {
×
1225
                apiServerKey, _ = rsa.GenerateKey(rand.Reader, 2048)
×
1226
        })
×
1227
        return apiServerKey
×
1228
}
1229

1230
// CreateStorageClass creates storage class CR
1231
func CreateStorageClass(name string, annotations map[string]string) *storagev1.StorageClass {
1✔
1232
        return &storagev1.StorageClass{
1✔
1233
                ObjectMeta: metav1.ObjectMeta{
1✔
1234
                        Name:        name,
1✔
1235
                        Annotations: annotations,
1✔
1236
                },
1✔
1237
        }
1✔
1238
}
1✔
1239

1240
// CreateImporterTestPod creates importer test pod CR
1241
func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scratchPvc *corev1.PersistentVolumeClaim) *corev1.Pod {
×
1242
        // importer pod name contains the pvc name
×
1243
        podName := fmt.Sprintf("%s-%s", common.ImporterPodName, pvc.Name)
×
1244

×
1245
        blockOwnerDeletion := true
×
1246
        isController := true
×
1247

×
1248
        volumes := []corev1.Volume{
×
1249
                {
×
1250
                        Name: dvname,
×
1251
                        VolumeSource: corev1.VolumeSource{
×
1252
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
×
1253
                                        ClaimName: pvc.Name,
×
1254
                                        ReadOnly:  false,
×
1255
                                },
×
1256
                        },
×
1257
                },
×
1258
        }
×
1259

×
1260
        if scratchPvc != nil {
×
1261
                volumes = append(volumes, corev1.Volume{
×
1262
                        Name: ScratchVolName,
×
1263
                        VolumeSource: corev1.VolumeSource{
×
1264
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
×
1265
                                        ClaimName: scratchPvc.Name,
×
1266
                                        ReadOnly:  false,
×
1267
                                },
×
1268
                        },
×
1269
                })
×
1270
        }
×
1271

1272
        pod := &corev1.Pod{
×
1273
                TypeMeta: metav1.TypeMeta{
×
1274
                        Kind:       "Pod",
×
1275
                        APIVersion: "v1",
×
1276
                },
×
1277
                ObjectMeta: metav1.ObjectMeta{
×
1278
                        Name:      podName,
×
1279
                        Namespace: pvc.Namespace,
×
1280
                        Annotations: map[string]string{
×
1281
                                AnnCreatedBy: "yes",
×
1282
                        },
×
1283
                        Labels: map[string]string{
×
1284
                                common.CDILabelKey:        common.CDILabelValue,
×
1285
                                common.CDIComponentLabel:  common.ImporterPodName,
×
1286
                                common.PrometheusLabelKey: common.PrometheusLabelValue,
×
1287
                        },
×
1288
                        OwnerReferences: []metav1.OwnerReference{
×
1289
                                {
×
1290
                                        APIVersion:         "v1",
×
1291
                                        Kind:               "PersistentVolumeClaim",
×
1292
                                        Name:               pvc.Name,
×
1293
                                        UID:                pvc.GetUID(),
×
1294
                                        BlockOwnerDeletion: &blockOwnerDeletion,
×
1295
                                        Controller:         &isController,
×
1296
                                },
×
1297
                        },
×
1298
                },
×
1299
                Spec: corev1.PodSpec{
×
1300
                        Containers: []corev1.Container{
×
1301
                                {
×
1302
                                        Name:            common.ImporterPodName,
×
1303
                                        Image:           "test/myimage",
×
1304
                                        ImagePullPolicy: corev1.PullPolicy("Always"),
×
1305
                                        Args:            []string{"-v=5"},
×
1306
                                        Ports: []corev1.ContainerPort{
×
1307
                                                {
×
1308
                                                        Name:          "metrics",
×
1309
                                                        ContainerPort: 8443,
×
1310
                                                        Protocol:      corev1.ProtocolTCP,
×
1311
                                                },
×
1312
                                        },
×
1313
                                },
×
1314
                        },
×
1315
                        RestartPolicy: corev1.RestartPolicyOnFailure,
×
1316
                        Volumes:       volumes,
×
1317
                },
×
1318
        }
×
1319

×
1320
        ep, _ := GetEndpoint(pvc)
×
1321
        source := GetSource(pvc)
×
1322
        contentType := GetPVCContentType(pvc)
×
1323
        imageSize, _ := GetRequestedImageSize(pvc)
×
1324
        volumeMode := GetVolumeMode(pvc)
×
1325

×
1326
        env := []corev1.EnvVar{
×
1327
                {
×
1328
                        Name:  common.ImporterSource,
×
1329
                        Value: source,
×
1330
                },
×
1331
                {
×
1332
                        Name:  common.ImporterEndpoint,
×
1333
                        Value: ep,
×
1334
                },
×
1335
                {
×
1336
                        Name:  common.ImporterContentType,
×
1337
                        Value: string(contentType),
×
1338
                },
×
1339
                {
×
1340
                        Name:  common.ImporterImageSize,
×
1341
                        Value: imageSize,
×
1342
                },
×
1343
                {
×
1344
                        Name:  common.OwnerUID,
×
1345
                        Value: string(pvc.UID),
×
1346
                },
×
1347
                {
×
1348
                        Name:  common.InsecureTLSVar,
×
1349
                        Value: "false",
×
1350
                },
×
1351
        }
×
1352
        pod.Spec.Containers[0].Env = env
×
1353
        if volumeMode == corev1.PersistentVolumeBlock {
×
1354
                pod.Spec.Containers[0].VolumeDevices = AddVolumeDevices()
×
1355
        } else {
×
1356
                pod.Spec.Containers[0].VolumeMounts = AddImportVolumeMounts()
×
1357
        }
×
1358

1359
        if scratchPvc != nil {
×
1360
                pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
×
1361
                        Name:      ScratchVolName,
×
1362
                        MountPath: common.ScratchDataDir,
×
1363
                })
×
1364
        }
×
1365

1366
        return pod
×
1367
}
1368

1369
// CreateStorageClassWithProvisioner creates CR of storage class with provisioner
1370
func CreateStorageClassWithProvisioner(name string, annotations, labels map[string]string, provisioner string) *storagev1.StorageClass {
×
1371
        return &storagev1.StorageClass{
×
1372
                Provisioner: provisioner,
×
1373
                ObjectMeta: metav1.ObjectMeta{
×
1374
                        Name:        name,
×
1375
                        Annotations: annotations,
×
1376
                        Labels:      labels,
×
1377
                },
×
1378
        }
×
1379
}
×
1380

1381
// CreateClient creates a fake client
1382
func CreateClient(objs ...runtime.Object) client.Client {
1✔
1383
        s := scheme.Scheme
1✔
1384
        _ = cdiv1.AddToScheme(s)
1✔
1385
        _ = corev1.AddToScheme(s)
1✔
1386
        _ = storagev1.AddToScheme(s)
1✔
1387
        _ = ocpconfigv1.Install(s)
1✔
1388

1✔
1389
        return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objs...).Build()
1✔
1390
}
1✔
1391

1392
// ErrQuotaExceeded checked is the error is of exceeded quota
1393
func ErrQuotaExceeded(err error) bool {
×
1394
        return strings.Contains(err.Error(), "exceeded quota:")
×
1395
}
×
1396

1397
// GetContentType returns the content type. If invalid or not set, default to kubevirt
1398
func GetContentType(contentType cdiv1.DataVolumeContentType) cdiv1.DataVolumeContentType {
1✔
1399
        switch contentType {
1✔
1400
        case
1401
                cdiv1.DataVolumeKubeVirt,
1402
                cdiv1.DataVolumeArchive:
1✔
1403
        default:
×
1404
                // TODO - shouldn't archive be the default?
×
1405
                contentType = cdiv1.DataVolumeKubeVirt
×
1406
        }
1407
        return contentType
1✔
1408
}
1409

1410
// GetPVCContentType returns the content type of the source image. If invalid or not set, default to kubevirt
1411
func GetPVCContentType(pvc *corev1.PersistentVolumeClaim) cdiv1.DataVolumeContentType {
×
1412
        contentType, found := pvc.Annotations[AnnContentType]
×
1413
        if !found {
×
1414
                // TODO - shouldn't archive be the default?
×
1415
                return cdiv1.DataVolumeKubeVirt
×
1416
        }
×
1417

1418
        return GetContentType(cdiv1.DataVolumeContentType(contentType))
×
1419
}
1420

1421
// GetNamespace returns the given namespace if not empty, otherwise the default namespace
1422
func GetNamespace(namespace, defaultNamespace string) string {
×
1423
        if namespace == "" {
×
1424
                return defaultNamespace
×
1425
        }
×
1426
        return namespace
×
1427
}
1428

1429
// IsErrCacheNotStarted checked is the error is of cache not started
1430
func IsErrCacheNotStarted(err error) bool {
×
1431
        target := &runtimecache.ErrCacheNotStarted{}
×
1432
        return errors.As(err, &target)
×
1433
}
×
1434

1435
// NewImportDataVolume returns new import DataVolume CR
1436
func NewImportDataVolume(name string) *cdiv1.DataVolume {
×
1437
        return &cdiv1.DataVolume{
×
1438
                TypeMeta: metav1.TypeMeta{APIVersion: cdiv1.SchemeGroupVersion.String()},
×
1439
                ObjectMeta: metav1.ObjectMeta{
×
1440
                        Name:      name,
×
1441
                        Namespace: metav1.NamespaceDefault,
×
1442
                        UID:       types.UID(metav1.NamespaceDefault + "-" + name),
×
1443
                },
×
1444
                Spec: cdiv1.DataVolumeSpec{
×
1445
                        Source: &cdiv1.DataVolumeSource{
×
1446
                                HTTP: &cdiv1.DataVolumeSourceHTTP{
×
1447
                                        URL: "http://example.com/data",
×
1448
                                },
×
1449
                        },
×
1450
                        PVC: &corev1.PersistentVolumeClaimSpec{
×
1451
                                AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
×
1452
                        },
×
1453
                        PriorityClassName: "p0",
×
1454
                },
×
1455
        }
×
1456
}
×
1457

1458
// GetCloneSourceInfo returns the type, name and namespace of the cloning source
1459
func GetCloneSourceInfo(dv *cdiv1.DataVolume) (sourceType, sourceName, sourceNamespace string) {
×
1460
        // Cloning sources are mutually exclusive
×
1461
        if dv.Spec.Source.PVC != nil {
×
1462
                return "pvc", dv.Spec.Source.PVC.Name, dv.Spec.Source.PVC.Namespace
×
1463
        }
×
1464

1465
        if dv.Spec.Source.Snapshot != nil {
×
1466
                return "snapshot", dv.Spec.Source.Snapshot.Name, dv.Spec.Source.Snapshot.Namespace
×
1467
        }
×
1468

1469
        return "", "", ""
×
1470
}
1471

1472
// IsWaitForFirstConsumerEnabled tells us if we should respect "real" WFFC behavior or just let our worker pods randomly spawn
1473
func IsWaitForFirstConsumerEnabled(obj metav1.Object, gates featuregates.FeatureGates) (bool, error) {
×
1474
        // when PVC requests immediateBinding it cannot honor wffc logic
×
1475
        isImmediateBindingRequested := ImmediateBindingRequested(obj)
×
1476
        pvcHonorWaitForFirstConsumer := !isImmediateBindingRequested
×
1477
        globalHonorWaitForFirstConsumer, err := gates.HonorWaitForFirstConsumerEnabled()
×
1478
        if err != nil {
×
1479
                return false, err
×
1480
        }
×
1481

1482
        return pvcHonorWaitForFirstConsumer && globalHonorWaitForFirstConsumer, nil
×
1483
}
1484

1485
// AddImmediateBindingAnnotationIfWFFCDisabled adds the immediateBinding annotation if wffc feature gate is disabled
1486
func AddImmediateBindingAnnotationIfWFFCDisabled(obj metav1.Object, gates featuregates.FeatureGates) error {
×
1487
        globalHonorWaitForFirstConsumer, err := gates.HonorWaitForFirstConsumerEnabled()
×
1488
        if err != nil {
×
1489
                return err
×
1490
        }
×
1491
        if !globalHonorWaitForFirstConsumer {
×
1492
                AddAnnotation(obj, AnnImmediateBinding, "")
×
1493
        }
×
1494
        return nil
×
1495
}
1496

1497
// InflateSizeWithOverhead inflates a storage size with proper overhead calculations
1498
func InflateSizeWithOverhead(ctx context.Context, c client.Client, imgSize int64, pvcSpec *corev1.PersistentVolumeClaimSpec) (resource.Quantity, error) {
×
1499
        var returnSize resource.Quantity
×
1500

×
1501
        if util.ResolveVolumeMode(pvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
×
1502
                fsOverhead, err := GetFilesystemOverheadForStorageClass(ctx, c, pvcSpec.StorageClassName)
×
1503
                if err != nil {
×
1504
                        return resource.Quantity{}, err
×
1505
                }
×
1506
                // Parse filesystem overhead (percentage) into a 64-bit float
1507
                fsOverheadFloat, _ := strconv.ParseFloat(string(fsOverhead), 64)
×
1508

×
1509
                // Merge the previous values into a 'resource.Quantity' struct
×
1510
                requiredSpace := util.GetRequiredSpace(fsOverheadFloat, imgSize)
×
1511
                returnSize = *resource.NewScaledQuantity(requiredSpace, 0)
×
1512
        } else {
×
1513
                // Inflation is not needed with 'Block' mode
×
1514
                returnSize = *resource.NewScaledQuantity(imgSize, 0)
×
1515
        }
×
1516

1517
        return returnSize, nil
×
1518
}
1519

1520
// IsBound returns if the pvc is bound
1521
func IsBound(pvc *corev1.PersistentVolumeClaim) bool {
×
1522
        return pvc != nil && pvc.Status.Phase == corev1.ClaimBound
×
1523
}
×
1524

1525
// IsUnbound returns if the pvc is not bound yet
1526
func IsUnbound(pvc *corev1.PersistentVolumeClaim) bool {
×
1527
        return !IsBound(pvc)
×
1528
}
×
1529

1530
// IsLost returns if the pvc is lost
1531
func IsLost(pvc *corev1.PersistentVolumeClaim) bool {
×
1532
        return pvc != nil && pvc.Status.Phase == corev1.ClaimLost
×
1533
}
×
1534

1535
// IsImageStream returns true if registry source is ImageStream
1536
func IsImageStream(pvc *corev1.PersistentVolumeClaim) bool {
×
1537
        return pvc.Annotations[AnnRegistryImageStream] == "true"
×
1538
}
×
1539

1540
// ShouldIgnorePod checks if a pod should be ignored.
1541
// If this is a completed pod that was used for one checkpoint of a multi-stage import, it
1542
// should be ignored by pod lookups as long as the retainAfterCompletion annotation is set.
1543
func ShouldIgnorePod(pod *corev1.Pod, pvc *corev1.PersistentVolumeClaim) bool {
×
1544
        retain := pvc.ObjectMeta.Annotations[AnnPodRetainAfterCompletion]
×
1545
        checkpoint := pvc.ObjectMeta.Annotations[AnnCurrentCheckpoint]
×
1546
        if checkpoint != "" && pod.Status.Phase == corev1.PodSucceeded {
×
1547
                return retain == "true"
×
1548
        }
×
1549
        return false
×
1550
}
1551

1552
// BuildHTTPClient generates an http client that accepts any certificate, since we are using
1553
// it to get prometheus data it doesn't matter if someone can intercept the data. Once we have
1554
// a mechanism to properly sign the server, we can update this method to get a proper client.
1555
func BuildHTTPClient(httpClient *http.Client) *http.Client {
×
1556
        if httpClient == nil {
×
1557
                defaultTransport := http.DefaultTransport.(*http.Transport)
×
1558
                // Create new Transport that ignores self-signed SSL
×
1559
                //nolint:gosec
×
1560
                tr := &http.Transport{
×
1561
                        Proxy:                 defaultTransport.Proxy,
×
1562
                        DialContext:           defaultTransport.DialContext,
×
1563
                        MaxIdleConns:          defaultTransport.MaxIdleConns,
×
1564
                        IdleConnTimeout:       defaultTransport.IdleConnTimeout,
×
1565
                        ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
×
1566
                        TLSHandshakeTimeout:   defaultTransport.TLSHandshakeTimeout,
×
1567
                        TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},
×
1568
                }
×
1569
                httpClient = &http.Client{
×
1570
                        Transport: tr,
×
1571
                }
×
1572
        }
×
1573
        return httpClient
×
1574
}
1575

1576
// ErrConnectionRefused checks for connection refused errors
1577
func ErrConnectionRefused(err error) bool {
×
1578
        return strings.Contains(err.Error(), "connection refused")
×
1579
}
×
1580

1581
// GetPodMetricsPort returns, if exists, the metrics port from the passed pod
1582
func GetPodMetricsPort(pod *corev1.Pod) (int, error) {
1✔
1583
        for _, container := range pod.Spec.Containers {
2✔
1584
                for _, port := range container.Ports {
2✔
1585
                        if port.Name == "metrics" {
2✔
1586
                                return int(port.ContainerPort), nil
1✔
1587
                        }
1✔
1588
                }
1589
        }
1590
        return 0, errors.New("Metrics port not found in pod")
1✔
1591
}
1592

1593
// GetMetricsURL builds the metrics URL according to the specified pod
1594
func GetMetricsURL(pod *corev1.Pod) (string, error) {
1✔
1595
        if pod == nil {
1✔
1596
                return "", nil
×
1597
        }
×
1598
        port, err := GetPodMetricsPort(pod)
1✔
1599
        if err != nil || pod.Status.PodIP == "" {
2✔
1600
                return "", err
1✔
1601
        }
1✔
1602
        domain := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(port))
1✔
1603
        url := fmt.Sprintf("https://%s/metrics", domain)
1✔
1604
        return url, nil
1✔
1605
}
1606

1607
// GetProgressReportFromURL fetches the progress report from the passed URL according to an specific metric expression and ownerUID
1608
func GetProgressReportFromURL(ctx context.Context, url string, httpClient *http.Client, metricExp, ownerUID string) (string, error) {
×
1609
        regExp := regexp.MustCompile(fmt.Sprintf("(%s)\\{ownerUID\\=%q\\} (\\d{1,3}\\.?\\d*)", metricExp, ownerUID))
×
1610
        // pod could be gone, don't block an entire thread for 30 seconds
×
1611
        // just to get back an i/o timeout
×
1612
        ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
×
1613
        defer cancel()
×
1614
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
×
1615
        if err != nil {
×
1616
                return "", err
×
1617
        }
×
1618
        resp, err := httpClient.Do(req)
×
1619
        if err != nil {
×
1620
                if ErrConnectionRefused(err) {
×
1621
                        return "", nil
×
1622
                }
×
1623
                return "", err
×
1624
        }
1625
        defer resp.Body.Close()
×
1626
        body, err := io.ReadAll(resp.Body)
×
1627
        if err != nil {
×
1628
                return "", err
×
1629
        }
×
1630

1631
        // Parse the progress from the body
1632
        progressReport := ""
×
1633
        match := regExp.FindStringSubmatch(string(body))
×
1634
        if match != nil {
×
1635
                progressReport = match[len(match)-1]
×
1636
        }
×
1637
        return progressReport, nil
×
1638
}
1639

1640
// UpdateHTTPAnnotations updates the passed annotations for proper http import
1641
func UpdateHTTPAnnotations(annotations map[string]string, http *cdiv1.DataVolumeSourceHTTP) {
×
1642
        annotations[AnnEndpoint] = http.URL
×
1643
        annotations[AnnSource] = SourceHTTP
×
1644

×
1645
        if http.SecretRef != "" {
×
1646
                annotations[AnnSecret] = http.SecretRef
×
1647
        }
×
1648
        if http.CertConfigMap != "" {
×
1649
                annotations[AnnCertConfigMap] = http.CertConfigMap
×
1650
        }
×
1651
        for index, header := range http.ExtraHeaders {
×
1652
                annotations[fmt.Sprintf("%s.%d", AnnExtraHeaders, index)] = header
×
1653
        }
×
1654
        for index, header := range http.SecretExtraHeaders {
×
1655
                annotations[fmt.Sprintf("%s.%d", AnnSecretExtraHeaders, index)] = header
×
1656
        }
×
1657
}
1658

1659
// UpdateS3Annotations updates the passed annotations for proper S3 import
1660
func UpdateS3Annotations(annotations map[string]string, s3 *cdiv1.DataVolumeSourceS3) {
×
1661
        annotations[AnnEndpoint] = s3.URL
×
1662
        annotations[AnnSource] = SourceS3
×
1663
        if s3.SecretRef != "" {
×
1664
                annotations[AnnSecret] = s3.SecretRef
×
1665
        }
×
1666
        if s3.CertConfigMap != "" {
×
1667
                annotations[AnnCertConfigMap] = s3.CertConfigMap
×
1668
        }
×
1669
}
1670

1671
// UpdateGCSAnnotations updates the passed annotations for proper GCS import
1672
func UpdateGCSAnnotations(annotations map[string]string, gcs *cdiv1.DataVolumeSourceGCS) {
×
1673
        annotations[AnnEndpoint] = gcs.URL
×
1674
        annotations[AnnSource] = SourceGCS
×
1675
        if gcs.SecretRef != "" {
×
1676
                annotations[AnnSecret] = gcs.SecretRef
×
1677
        }
×
1678
}
1679

1680
// UpdateRegistryAnnotations updates the passed annotations for proper registry import
1681
func UpdateRegistryAnnotations(annotations map[string]string, registry *cdiv1.DataVolumeSourceRegistry) {
×
1682
        annotations[AnnSource] = SourceRegistry
×
1683
        pullMethod := registry.PullMethod
×
1684
        if pullMethod != nil && *pullMethod != "" {
×
1685
                annotations[AnnRegistryImportMethod] = string(*pullMethod)
×
1686
        }
×
1687
        url := registry.URL
×
1688
        if url != nil && *url != "" {
×
1689
                annotations[AnnEndpoint] = *url
×
1690
        } else {
×
1691
                imageStream := registry.ImageStream
×
1692
                if imageStream != nil && *imageStream != "" {
×
1693
                        annotations[AnnEndpoint] = *imageStream
×
1694
                        annotations[AnnRegistryImageStream] = "true"
×
1695
                }
×
1696
        }
1697
        secretRef := registry.SecretRef
×
1698
        if secretRef != nil && *secretRef != "" {
×
1699
                annotations[AnnSecret] = *secretRef
×
1700
        }
×
1701
        certConfigMap := registry.CertConfigMap
×
1702
        if certConfigMap != nil && *certConfigMap != "" {
×
1703
                annotations[AnnCertConfigMap] = *certConfigMap
×
1704
        }
×
1705

1706
        if registry.Platform != nil && registry.Platform.Architecture != "" {
×
1707
                annotations[AnnRegistryImageArchitecture] = registry.Platform.Architecture
×
1708
        }
×
1709
}
1710

1711
// UpdateVDDKAnnotations updates the passed annotations for proper VDDK import
1712
func UpdateVDDKAnnotations(annotations map[string]string, vddk *cdiv1.DataVolumeSourceVDDK) {
×
1713
        annotations[AnnEndpoint] = vddk.URL
×
1714
        annotations[AnnSource] = SourceVDDK
×
1715
        annotations[AnnSecret] = vddk.SecretRef
×
1716
        annotations[AnnBackingFile] = vddk.BackingFile
×
1717
        annotations[AnnUUID] = vddk.UUID
×
1718
        annotations[AnnThumbprint] = vddk.Thumbprint
×
1719
        if vddk.InitImageURL != "" {
×
1720
                annotations[AnnVddkInitImageURL] = vddk.InitImageURL
×
1721
        }
×
1722
        if vddk.ExtraArgs != "" {
×
1723
                annotations[AnnVddkExtraArgs] = vddk.ExtraArgs
×
1724
        }
×
1725
}
1726

1727
// UpdateImageIOAnnotations updates the passed annotations for proper imageIO import
1728
func UpdateImageIOAnnotations(annotations map[string]string, imageio *cdiv1.DataVolumeSourceImageIO) {
×
1729
        annotations[AnnEndpoint] = imageio.URL
×
1730
        annotations[AnnSource] = SourceImageio
×
1731
        annotations[AnnSecret] = imageio.SecretRef
×
1732
        annotations[AnnCertConfigMap] = imageio.CertConfigMap
×
1733
        annotations[AnnDiskID] = imageio.DiskID
×
1734
}
×
1735

1736
// IsPVBoundToPVC checks if a PV is bound to a specific PVC
1737
func IsPVBoundToPVC(pv *corev1.PersistentVolume, pvc *corev1.PersistentVolumeClaim) bool {
1✔
1738
        claimRef := pv.Spec.ClaimRef
1✔
1739
        return claimRef != nil && claimRef.Name == pvc.Name && claimRef.Namespace == pvc.Namespace && claimRef.UID == pvc.UID
1✔
1740
}
1✔
1741

1742
// Rebind binds the PV of source to target
1743
func Rebind(ctx context.Context, c client.Client, source, target *corev1.PersistentVolumeClaim) error {
1✔
1744
        pv := &corev1.PersistentVolume{
1✔
1745
                ObjectMeta: metav1.ObjectMeta{
1✔
1746
                        Name: source.Spec.VolumeName,
1✔
1747
                },
1✔
1748
        }
1✔
1749

1✔
1750
        if err := c.Get(ctx, client.ObjectKeyFromObject(pv), pv); err != nil {
2✔
1751
                return err
1✔
1752
        }
1✔
1753

1754
        // Examine the claimref for the PV and see if it's still bound to PVC'
1755
        if pv.Spec.ClaimRef == nil {
1✔
1756
                return fmt.Errorf("PV %s claimRef is nil", pv.Name)
×
1757
        }
×
1758

1759
        if !IsPVBoundToPVC(pv, source) {
2✔
1760
                // Something is not right if the PV is neither bound to PVC' nor target PVC
1✔
1761
                if !IsPVBoundToPVC(pv, target) {
2✔
1762
                        klog.Errorf("PV bound to unexpected PVC: Could not rebind to target PVC '%s'", target.Name)
1✔
1763
                        return fmt.Errorf("PV %s bound to unexpected claim %s", pv.Name, pv.Spec.ClaimRef.Name)
1✔
1764
                }
1✔
1765
                // our work is done
1766
                return nil
1✔
1767
        }
1768

1769
        // Rebind PVC to target PVC
1770
        pv.Spec.ClaimRef = &corev1.ObjectReference{
1✔
1771
                Namespace:       target.Namespace,
1✔
1772
                Name:            target.Name,
1✔
1773
                UID:             target.UID,
1✔
1774
                ResourceVersion: target.ResourceVersion,
1✔
1775
        }
1✔
1776
        klog.V(3).Info("Rebinding PV to target PVC", "PVC", target.Name)
1✔
1777
        if err := c.Update(context.TODO(), pv); err != nil {
1✔
1778
                return err
×
1779
        }
×
1780

1781
        return nil
1✔
1782
}
1783

1784
// BulkDeleteResources deletes a bunch of resources
1785
func BulkDeleteResources(ctx context.Context, c client.Client, obj client.ObjectList, lo client.ListOption) error {
×
1786
        if err := c.List(ctx, obj, lo); err != nil {
×
1787
                if meta.IsNoMatchError(err) {
×
1788
                        return nil
×
1789
                }
×
1790
                return err
×
1791
        }
1792

1793
        sv := reflect.ValueOf(obj).Elem()
×
1794
        iv := sv.FieldByName("Items")
×
1795

×
1796
        for i := 0; i < iv.Len(); i++ {
×
1797
                obj := iv.Index(i).Addr().Interface().(client.Object)
×
1798
                if obj.GetDeletionTimestamp().IsZero() {
×
1799
                        klog.V(3).Infof("Deleting type %+v %+v", reflect.TypeOf(obj), obj)
×
1800
                        if err := c.Delete(ctx, obj); err != nil {
×
1801
                                return err
×
1802
                        }
×
1803
                }
1804
        }
1805

1806
        return nil
×
1807
}
1808

1809
// ValidateSnapshotCloneSize does proper size validation when doing a clone from snapshot operation
1810
func ValidateSnapshotCloneSize(snapshot *snapshotv1.VolumeSnapshot, pvcSpec *corev1.PersistentVolumeClaimSpec, targetSC *storagev1.StorageClass, log logr.Logger) (bool, error) {
×
1811
        restoreSize := snapshot.Status.RestoreSize
×
1812
        if restoreSize == nil {
×
1813
                return false, fmt.Errorf("snapshot has no RestoreSize")
×
1814
        }
×
1815
        targetRequest, hasTargetRequest := pvcSpec.Resources.Requests[corev1.ResourceStorage]
×
1816
        allowExpansion := targetSC.AllowVolumeExpansion != nil && *targetSC.AllowVolumeExpansion
×
1817
        if hasTargetRequest {
×
1818
                // otherwise will just use restoreSize
×
1819
                if restoreSize.Cmp(targetRequest) < 0 && !allowExpansion {
×
1820
                        log.V(3).Info("Can't expand restored PVC because SC does not allow expansion, need to fall back to host assisted")
×
1821
                        return false, nil
×
1822
                }
×
1823
        }
1824
        return true, nil
×
1825
}
1826

1827
// ValidateSnapshotCloneProvisioners validates the target PVC storage class against the snapshot class provisioner
1828
func ValidateSnapshotCloneProvisioners(vsc *snapshotv1.VolumeSnapshotContent, storageClass *storagev1.StorageClass) (bool, error) {
×
1829
        // Do snapshot and storage class validation
×
1830
        if storageClass == nil {
×
1831
                return false, fmt.Errorf("target storage class not found")
×
1832
        }
×
1833
        if storageClass.Provisioner != vsc.Spec.Driver {
×
1834
                return false, nil
×
1835
        }
×
1836
        // TODO: get sourceVolumeMode from volumesnapshotcontent and validate against target spec
1837
        // currently don't have CRDs in CI with sourceVolumeMode which is pretty new
1838
        // converting volume mode is possible but has security implications
1839
        return true, nil
×
1840
}
1841

1842
// GetSnapshotClassForSmartClone looks up the snapshot class based on the storage class
1843
func GetSnapshotClassForSmartClone(pvc *corev1.PersistentVolumeClaim, targetPvcStorageClassName, snapshotClassName *string, log logr.Logger, client client.Client, recorder record.EventRecorder) (string, error) {
×
1844
        logger := log.WithName("GetSnapshotClassForSmartClone").V(3)
×
1845
        // Check if relevant CRDs are available
×
1846
        if !isCsiCrdsDeployed(client, log) {
×
1847
                logger.Info("Missing CSI snapshotter CRDs, falling back to host assisted clone")
×
1848
                return "", nil
×
1849
        }
×
1850

1851
        targetStorageClass, err := GetStorageClassByNameWithK8sFallback(context.TODO(), client, targetPvcStorageClassName)
×
1852
        if err != nil {
×
1853
                return "", err
×
1854
        }
×
1855
        if targetStorageClass == nil {
×
1856
                logger.Info("Target PVC's Storage Class not found")
×
1857
                return "", nil
×
1858
        }
×
1859

1860
        vscName, err := GetVolumeSnapshotClass(context.TODO(), client, pvc, targetStorageClass.Provisioner, snapshotClassName, logger, recorder)
×
1861
        if err != nil {
×
1862
                return "", err
×
1863
        }
×
1864
        if vscName != nil {
×
1865
                if pvc != nil {
×
1866
                        logger.Info("smart-clone is applicable for datavolume", "datavolume",
×
1867
                                pvc.Name, "snapshot class", *vscName)
×
1868
                }
×
1869
                return *vscName, nil
×
1870
        }
1871

1872
        logger.Info("Could not match snapshotter with storage class, falling back to host assisted clone")
×
1873
        return "", nil
×
1874
}
1875

1876
// GetVolumeSnapshotClass looks up the snapshot class based on the driver and an optional specific name
1877
// In case of multiple candidates, it returns the default-annotated one, or the sorted list first one if no such default
1878
func GetVolumeSnapshotClass(ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim, driver string, snapshotClassName *string, log logr.Logger, recorder record.EventRecorder) (*string, error) {
×
1879
        logger := log.WithName("GetVolumeSnapshotClass").V(3)
×
1880

×
1881
        logEvent := func(message, vscName string) {
×
1882
                logger.Info(message, "name", vscName)
×
1883
                if pvc != nil {
×
1884
                        msg := fmt.Sprintf("%s %s", message, vscName)
×
1885
                        recorder.Event(pvc, corev1.EventTypeNormal, VolumeSnapshotClassSelected, msg)
×
1886
                }
×
1887
        }
1888

1889
        if snapshotClassName != nil {
×
1890
                vsc := &snapshotv1.VolumeSnapshotClass{}
×
1891
                if err := c.Get(context.TODO(), types.NamespacedName{Name: *snapshotClassName}, vsc); err != nil {
×
1892
                        return nil, err
×
1893
                }
×
1894
                if vsc.Driver == driver {
×
1895
                        logEvent(MessageStorageProfileVolumeSnapshotClassSelected, vsc.Name)
×
1896
                        return snapshotClassName, nil
×
1897
                }
×
1898
                return nil, nil
×
1899
        }
1900

1901
        vscList := &snapshotv1.VolumeSnapshotClassList{}
×
1902
        if err := c.List(ctx, vscList); err != nil {
×
1903
                if meta.IsNoMatchError(err) {
×
1904
                        return nil, nil
×
1905
                }
×
1906
                return nil, err
×
1907
        }
1908

1909
        var candidates []string
×
1910
        for _, vsc := range vscList.Items {
×
1911
                if vsc.Driver == driver {
×
1912
                        if vsc.Annotations[AnnDefaultSnapshotClass] == "true" {
×
1913
                                logEvent(MessageDefaultVolumeSnapshotClassSelected, vsc.Name)
×
1914
                                vscName := vsc.Name
×
1915
                                return &vscName, nil
×
1916
                        }
×
1917
                        candidates = append(candidates, vsc.Name)
×
1918
                }
1919
        }
1920

1921
        if len(candidates) > 0 {
×
1922
                sort.Strings(candidates)
×
1923
                logEvent(MessageFirstVolumeSnapshotClassSelected, candidates[0])
×
1924
                return &candidates[0], nil
×
1925
        }
×
1926

1927
        return nil, nil
×
1928
}
1929

1930
// isCsiCrdsDeployed checks whether the CSI snapshotter CRD are deployed
1931
func isCsiCrdsDeployed(c client.Client, log logr.Logger) bool {
×
1932
        version := "v1"
×
1933
        vsClass := "volumesnapshotclasses." + snapshotv1.GroupName
×
1934
        vsContent := "volumesnapshotcontents." + snapshotv1.GroupName
×
1935
        vs := "volumesnapshots." + snapshotv1.GroupName
×
1936

×
1937
        return isCrdDeployed(c, vsClass, version, log) &&
×
1938
                isCrdDeployed(c, vsContent, version, log) &&
×
1939
                isCrdDeployed(c, vs, version, log)
×
1940
}
×
1941

1942
// isCrdDeployed checks whether a CRD is deployed
1943
func isCrdDeployed(c client.Client, name, version string, log logr.Logger) bool {
×
1944
        crd := &extv1.CustomResourceDefinition{}
×
1945
        err := c.Get(context.TODO(), types.NamespacedName{Name: name}, crd)
×
1946
        if err != nil {
×
1947
                if !k8serrors.IsNotFound(err) {
×
1948
                        log.Info("Error looking up CRD", "crd name", name, "version", version, "error", err)
×
1949
                }
×
1950
                return false
×
1951
        }
1952

1953
        for _, v := range crd.Spec.Versions {
×
1954
                if v.Name == version && v.Served {
×
1955
                        return true
×
1956
                }
×
1957
        }
1958

1959
        return false
×
1960
}
1961

1962
// IsSnapshotReady indicates if a volume snapshot is ready to be used
1963
func IsSnapshotReady(snapshot *snapshotv1.VolumeSnapshot) bool {
×
1964
        return snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse
×
1965
}
×
1966

1967
// GetResource updates given obj with the data of the object with the same name and namespace
1968
func GetResource(ctx context.Context, c client.Client, namespace, name string, obj client.Object) (bool, error) {
×
1969
        obj.SetNamespace(namespace)
×
1970
        obj.SetName(name)
×
1971

×
1972
        err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj)
×
1973
        if err != nil {
×
1974
                if k8serrors.IsNotFound(err) {
×
1975
                        return false, nil
×
1976
                }
×
1977

1978
                return false, err
×
1979
        }
1980

1981
        return true, nil
×
1982
}
1983

1984
// PatchArgs are the args for Patch
1985
type PatchArgs struct {
1986
        Client client.Client
1987
        Log    logr.Logger
1988
        Obj    client.Object
1989
        OldObj client.Object
1990
}
1991

1992
// GetAnnotatedEventSource returns resource referenced by AnnEventSource annotations
1993
func GetAnnotatedEventSource(ctx context.Context, c client.Client, obj client.Object) (client.Object, error) {
×
1994
        esk, ok := obj.GetAnnotations()[AnnEventSourceKind]
×
1995
        if !ok {
×
1996
                return obj, nil
×
1997
        }
×
1998
        if esk != "PersistentVolumeClaim" {
×
1999
                return obj, nil
×
2000
        }
×
2001
        es, ok := obj.GetAnnotations()[AnnEventSource]
×
2002
        if !ok {
×
2003
                return obj, nil
×
2004
        }
×
2005
        namespace, name, err := cache.SplitMetaNamespaceKey(es)
×
2006
        if err != nil {
×
2007
                return nil, err
×
2008
        }
×
2009
        pvc := &corev1.PersistentVolumeClaim{
×
2010
                ObjectMeta: metav1.ObjectMeta{
×
2011
                        Namespace: namespace,
×
2012
                        Name:      name,
×
2013
                },
×
2014
        }
×
2015
        if err := c.Get(ctx, client.ObjectKeyFromObject(pvc), pvc); err != nil {
×
2016
                return nil, err
×
2017
        }
×
2018
        return pvc, nil
×
2019
}
2020

2021
// OwnedByDataVolume returns true if the object is owned by a DataVolume
2022
func OwnedByDataVolume(obj metav1.Object) bool {
×
2023
        owner := metav1.GetControllerOf(obj)
×
2024
        return owner != nil && owner.Kind == "DataVolume"
×
2025
}
×
2026

2027
// CopyAllowedAnnotations copies the allowed annotations from the source object
2028
// to the destination object
2029
func CopyAllowedAnnotations(srcObj, dstObj metav1.Object) {
×
2030
        for ann, def := range allowedAnnotations {
×
2031
                val, ok := srcObj.GetAnnotations()[ann]
×
2032
                if !ok && def != "" {
×
2033
                        val = def
×
2034
                }
×
2035
                if val != "" {
×
2036
                        klog.V(1).Info("Applying annotation", "Name", dstObj.GetName(), ann, val)
×
2037
                        AddAnnotation(dstObj, ann, val)
×
2038
                }
×
2039
        }
2040
}
2041

2042
// CopyAllowedLabels copies allowed labels matching the validLabelsMatch regexp from the
2043
// source map to the destination object allowing overwrites
2044
func CopyAllowedLabels(srcLabels map[string]string, dstObj metav1.Object, overwrite bool) {
1✔
2045
        for label, value := range srcLabels {
2✔
2046
                if _, found := dstObj.GetLabels()[label]; (!found || overwrite) && validLabelsMatch.MatchString(label) {
2✔
2047
                        AddLabel(dstObj, label, value)
1✔
2048
                }
1✔
2049
        }
2050
}
2051

2052
// ClaimMayExistBeforeDataVolume returns true if the PVC may exist before the DataVolume
2053
func ClaimMayExistBeforeDataVolume(c client.Client, pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
×
2054
        if ClaimIsPopulatedForDataVolume(pvc, dv) {
×
2055
                return true, nil
×
2056
        }
×
2057
        return AllowClaimAdoption(c, pvc, dv)
×
2058
}
2059

2060
// ClaimIsPopulatedForDataVolume returns true if the PVC is populated for the given DataVolume
2061
func ClaimIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
×
2062
        return pvc != nil && dv != nil && pvc.Annotations[AnnPopulatedFor] == dv.Name
×
2063
}
×
2064

2065
// AllowClaimAdoption returns true if the PVC may be adopted
2066
func AllowClaimAdoption(c client.Client, pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
×
2067
        if pvc == nil || dv == nil {
×
2068
                return false, nil
×
2069
        }
×
2070
        anno, ok := pvc.Annotations[AnnCreatedForDataVolume]
×
2071
        if ok && anno == string(dv.UID) {
×
2072
                return false, nil
×
2073
        }
×
2074
        anno, ok = dv.Annotations[AnnAllowClaimAdoption]
×
2075
        // if annotation exists, go with that regardless of featuregate
×
2076
        if ok {
×
2077
                val, _ := strconv.ParseBool(anno)
×
2078
                return val, nil
×
2079
        }
×
2080
        return featuregates.NewFeatureGates(c).ClaimAdoptionEnabled()
×
2081
}
2082

2083
// ResolveDataSourceChain resolves a DataSource reference.
2084
// Returns an error if DataSource reference is not found or
2085
// DataSource reference points to another DataSource
2086
func ResolveDataSourceChain(ctx context.Context, client client.Client, dataSource *cdiv1.DataSource) (*cdiv1.DataSource, error) {
×
2087
        if dataSource.Spec.Source.DataSource == nil {
×
2088
                return dataSource, nil
×
2089
        }
×
2090

2091
        ref := dataSource.Spec.Source.DataSource
×
2092
        refNs := GetNamespace(ref.Namespace, dataSource.Namespace)
×
2093
        if ref.Name == dataSource.Name && refNs == dataSource.Namespace {
×
2094
                return nil, ErrDataSourceSelfReference
×
2095
        }
×
2096
        resolved := &cdiv1.DataSource{}
×
2097
        if err := client.Get(ctx, types.NamespacedName{Name: ref.Name, Namespace: refNs}, resolved); err != nil {
×
2098
                return nil, err
×
2099
        }
×
2100

2101
        if resolved.Spec.Source.DataSource != nil {
×
2102
                return nil, ErrDataSourceMaxDepthReached
×
2103
        }
×
2104

2105
        return resolved, nil
×
2106
}
2107

2108
func sortEvents(events *corev1.EventList, usingPopulator bool, pvcPrimeName string) {
1✔
2109
        // Sort event lists by containing primeName substring and most recent timestamp
1✔
2110
        sort.Slice(events.Items, func(i, j int) bool {
2✔
2111
                if usingPopulator {
2✔
2112
                        firstContainsPrime := strings.Contains(events.Items[i].Message, pvcPrimeName)
1✔
2113
                        secondContainsPrime := strings.Contains(events.Items[j].Message, pvcPrimeName)
1✔
2114

1✔
2115
                        if firstContainsPrime && !secondContainsPrime {
2✔
2116
                                return true
1✔
2117
                        }
1✔
2118
                        if !firstContainsPrime && secondContainsPrime {
2✔
2119
                                return false
1✔
2120
                        }
1✔
2121
                }
2122

2123
                // if the timestamps are the same, prioritze longer messages to make sure our sorting is deterministic
2124
                if events.Items[i].LastTimestamp.Time.Equal(events.Items[j].LastTimestamp.Time) {
1✔
NEW
2125
                        return len(events.Items[i].Message) > len(events.Items[j].Message)
×
NEW
2126
                }
×
2127

2128
                // if both contains primeName substring or neither, just sort on timestamp
2129
                return events.Items[i].LastTimestamp.Time.After(events.Items[j].LastTimestamp.Time)
1✔
2130
        })
2131
}
2132

2133
// UpdatePVCBoundContionFromEvents updates the bound condition annotations on the PVC based on recent events
2134
// This function can be used by both controller and populator packages to update PVC bound condition information
NEW
2135
func UpdatePVCBoundContionFromEvents(pvc *corev1.PersistentVolumeClaim, c client.Client, log logr.Logger) error {
×
NEW
2136
        currentPvcCopy := pvc.DeepCopy()
×
NEW
2137

×
NEW
2138
        anno := pvc.GetAnnotations()
×
NEW
2139
        if anno == nil {
×
NEW
2140
                return nil
×
NEW
2141
        }
×
2142

NEW
2143
        if IsBound(pvc) {
×
NEW
2144
                anno := pvc.GetAnnotations()
×
NEW
2145
                delete(anno, AnnBoundCondition)
×
NEW
2146
                delete(anno, AnnBoundConditionReason)
×
NEW
2147
                delete(anno, AnnBoundConditionMessage)
×
NEW
2148

×
NEW
2149
                if !reflect.DeepEqual(currentPvcCopy, pvc) {
×
NEW
2150
                        patch := client.MergeFrom(currentPvcCopy)
×
NEW
2151
                        if err := c.Patch(context.TODO(), pvc, patch); err != nil {
×
NEW
2152
                                return err
×
NEW
2153
                        }
×
2154
                }
2155

NEW
2156
                return nil
×
2157
        }
2158

NEW
2159
        if pvc.Status.Phase != corev1.ClaimPending {
×
NEW
2160
                return nil
×
NEW
2161
        }
×
2162

2163
        // set bound condition by getting the latest event
NEW
2164
        events := &corev1.EventList{}
×
NEW
2165

×
NEW
2166
        err := c.List(context.TODO(), events,
×
NEW
2167
                client.InNamespace(pvc.GetNamespace()),
×
NEW
2168
                client.MatchingFields{"involvedObject.name": pvc.GetName(),
×
NEW
2169
                        "involvedObject.uid": string(pvc.GetUID())},
×
NEW
2170
        )
×
NEW
2171

×
NEW
2172
        if err != nil {
×
NEW
2173
                // Log the error but don't fail the reconciliation
×
NEW
2174
                log.Error(err, "Unable to list events for PVC bound condition update", "pvc", pvc.Name)
×
NEW
2175
                return nil
×
NEW
2176
        }
×
2177

NEW
2178
        if len(events.Items) == 0 {
×
NEW
2179
                return nil
×
NEW
2180
        }
×
2181

NEW
2182
        pvcPrime, usingPopulator := anno[AnnPVCPrimeName]
×
NEW
2183

×
NEW
2184
        // Sort event lists by containing primeName substring and most recent timestamp
×
NEW
2185
        sortEvents(events, usingPopulator, pvcPrime)
×
NEW
2186

×
NEW
2187
        boundMessage := ""
×
NEW
2188
        // check if prime name annotation exists
×
NEW
2189
        if usingPopulator {
×
NEW
2190
                // if we are using populators get the latest event from prime pvc
×
NEW
2191
                pvcPrime = fmt.Sprintf("[%s] : ", pvcPrime)
×
NEW
2192

×
NEW
2193
                // if the first event does not contain a prime message, none will so return
×
NEW
2194
                primeIdx := strings.Index(events.Items[0].Message, pvcPrime)
×
NEW
2195
                if primeIdx == -1 {
×
NEW
2196
                        log.V(1).Info("No bound message found, skipping bound condition update", "pvc", pvc.Name)
×
NEW
2197
                        return nil
×
NEW
2198
                }
×
NEW
2199
                boundMessage = events.Items[0].Message[primeIdx+len(pvcPrime):]
×
NEW
2200
        } else {
×
NEW
2201
                // if not using populators just get the latest event
×
NEW
2202
                boundMessage = events.Items[0].Message
×
NEW
2203
        }
×
2204

2205
        // since we checked status of phase above, we know this is pending
NEW
2206
        anno[AnnBoundCondition] = "false"
×
NEW
2207
        anno[AnnBoundConditionReason] = "Pending"
×
NEW
2208
        anno[AnnBoundConditionMessage] = boundMessage
×
NEW
2209

×
NEW
2210
        patch := client.MergeFrom(currentPvcCopy)
×
NEW
2211
        if err := c.Patch(context.TODO(), pvc, patch); err != nil {
×
NEW
2212
                return err
×
NEW
2213
        }
×
2214

NEW
2215
        return nil
×
2216
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc