• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubevirt / containerized-data-importer / #5800

30 Jan 2026 06:07PM UTC coverage: 49.499% (+0.008%) from 49.491%
#5800

push

travis-ci

web-flow
Run go run ./robots/uploader -workspace /home/prow/go/src/github.com/kubevirt/project-infra/../containerized-data-importer/WORKSPACE -dry-run=false (#4020)

Signed-off-by: kubevirt-bot <kubevirtbot@redhat.com>

14712 of 29722 relevant lines covered (49.5%)

0.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.5
/pkg/controller/common/util.go
1
/*
2
Copyright 2022 The CDI Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
        http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package common
18

19
import (
20
        "context"
21
        "crypto/rand"
22
        "crypto/rsa"
23
        "crypto/tls"
24
        "fmt"
25
        "io"
26
        "net"
27
        "net/http"
28
        "reflect"
29
        "regexp"
30
        "sort"
31
        "strconv"
32
        "strings"
33
        "sync"
34
        "time"
35

36
        "github.com/go-logr/logr"
37
        snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
38
        ocpconfigv1 "github.com/openshift/api/config/v1"
39
        "github.com/pkg/errors"
40

41
        corev1 "k8s.io/api/core/v1"
42
        storagev1 "k8s.io/api/storage/v1"
43
        extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
44
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
45
        "k8s.io/apimachinery/pkg/api/meta"
46
        "k8s.io/apimachinery/pkg/api/resource"
47
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
48
        "k8s.io/apimachinery/pkg/labels"
49
        "k8s.io/apimachinery/pkg/runtime"
50
        "k8s.io/apimachinery/pkg/types"
51
        "k8s.io/apimachinery/pkg/util/sets"
52
        "k8s.io/client-go/tools/cache"
53
        "k8s.io/client-go/tools/record"
54
        "k8s.io/klog/v2"
55
        "k8s.io/utils/ptr"
56

57
        runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
58
        "sigs.k8s.io/controller-runtime/pkg/client"
59
        "sigs.k8s.io/controller-runtime/pkg/client/fake"
60

61
        cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
62
        cdiv1utils "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1/utils"
63
        "kubevirt.io/containerized-data-importer/pkg/client/clientset/versioned/scheme"
64
        "kubevirt.io/containerized-data-importer/pkg/common"
65
        featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
66
        "kubevirt.io/containerized-data-importer/pkg/token"
67
        "kubevirt.io/containerized-data-importer/pkg/util"
68
        sdkapi "kubevirt.io/controller-lifecycle-operator-sdk/api"
69
)
70

71
const (
72
        // DataVolName provides a const to use for creating volumes in pod specs
73
        DataVolName = "cdi-data-vol"
74

75
        // ScratchVolName provides a const to use for creating scratch pvc volumes in pod specs
76
        ScratchVolName = "cdi-scratch-vol"
77

78
        // AnnAPIGroup is the APIGroup for CDI
79
        AnnAPIGroup = "cdi.kubevirt.io"
80
        // AnnCreatedBy is a pod annotation indicating if the pod was created by the PVC
81
        AnnCreatedBy = AnnAPIGroup + "/storage.createdByController"
82
        // AnnPodPhase is a PVC annotation indicating the related pod progress (phase)
83
        AnnPodPhase = AnnAPIGroup + "/storage.pod.phase"
84
        // AnnPodReady tells whether the pod is ready
85
        AnnPodReady = AnnAPIGroup + "/storage.pod.ready"
86
        // AnnPodRestarts is a PVC annotation that tells how many times a related pod was restarted
87
        AnnPodRestarts = AnnAPIGroup + "/storage.pod.restarts"
88
        // AnnPodSchedulable is a PVC annotation that tells if the Pod is schedulable or not
89
        AnnPodSchedulable = AnnAPIGroup + "/storage.pod.schedulable"
90
        // AnnPopulatedFor is a PVC annotation telling the datavolume controller that the PVC is already populated
91
        AnnPopulatedFor = AnnAPIGroup + "/storage.populatedFor"
92
        // AnnPrePopulated is a PVC annotation telling the datavolume controller that the PVC is already populated
93
        AnnPrePopulated = AnnAPIGroup + "/storage.prePopulated"
94
        // AnnPriorityClassName is PVC annotation to indicate the priority class name for importer, cloner and uploader pod
95
        AnnPriorityClassName = AnnAPIGroup + "/storage.pod.priorityclassname"
96
        // AnnExternalPopulation annotation marks a PVC as "externally populated", allowing the import-controller to skip it
97
        AnnExternalPopulation = AnnAPIGroup + "/externalPopulation"
98

99
        // AnnPodRetainAfterCompletion is PVC annotation for retaining transfer pods after completion
100
        AnnPodRetainAfterCompletion = AnnAPIGroup + "/storage.pod.retainAfterCompletion"
101

102
        // AnnPreviousCheckpoint provides a const to indicate the previous snapshot for a multistage import
103
        AnnPreviousCheckpoint = AnnAPIGroup + "/storage.checkpoint.previous"
104
        // AnnCurrentCheckpoint provides a const to indicate the current snapshot for a multistage import
105
        AnnCurrentCheckpoint = AnnAPIGroup + "/storage.checkpoint.current"
106
        // AnnFinalCheckpoint provides a const to indicate whether the current checkpoint is the last one
107
        AnnFinalCheckpoint = AnnAPIGroup + "/storage.checkpoint.final"
108
        // AnnCheckpointsCopied is a prefix for recording which checkpoints have already been copied
109
        AnnCheckpointsCopied = AnnAPIGroup + "/storage.checkpoint.copied"
110

111
        // AnnCurrentPodID keeps track of the latest pod servicing this PVC
112
        AnnCurrentPodID = AnnAPIGroup + "/storage.checkpoint.pod.id"
113
        // AnnMultiStageImportDone marks a multi-stage import as totally finished
114
        AnnMultiStageImportDone = AnnAPIGroup + "/storage.checkpoint.done"
115

116
        // AnnPopulatorProgress is a standard annotation that can be used progress reporting
117
        AnnPopulatorProgress = AnnAPIGroup + "/storage.populator.progress"
118

119
        // AnnPreallocationRequested provides a const to indicate whether preallocation should be performed on the PV
120
        AnnPreallocationRequested = AnnAPIGroup + "/storage.preallocation.requested"
121
        // AnnPreallocationApplied provides a const for PVC preallocation annotation
122
        AnnPreallocationApplied = AnnAPIGroup + "/storage.preallocation"
123

124
        // AnnRunningCondition provides a const for the running condition
125
        AnnRunningCondition = AnnAPIGroup + "/storage.condition.running"
126
        // AnnRunningConditionMessage provides a const for the running condition
127
        AnnRunningConditionMessage = AnnAPIGroup + "/storage.condition.running.message"
128
        // AnnRunningConditionReason provides a const for the running condition
129
        AnnRunningConditionReason = AnnAPIGroup + "/storage.condition.running.reason"
130

131
        // AnnBoundCondition provides a const for the running condition
132
        AnnBoundCondition = AnnAPIGroup + "/storage.condition.bound"
133
        // AnnBoundConditionMessage provides a const for the running condition
134
        AnnBoundConditionMessage = AnnAPIGroup + "/storage.condition.bound.message"
135
        // AnnBoundConditionReason provides a const for the running condition
136
        AnnBoundConditionReason = AnnAPIGroup + "/storage.condition.bound.reason"
137

138
        // AnnSourceRunningCondition provides a const for the running condition
139
        AnnSourceRunningCondition = AnnAPIGroup + "/storage.condition.source.running"
140
        // AnnSourceRunningConditionMessage provides a const for the running condition
141
        AnnSourceRunningConditionMessage = AnnAPIGroup + "/storage.condition.source.running.message"
142
        // AnnSourceRunningConditionReason provides a const for the running condition
143
        AnnSourceRunningConditionReason = AnnAPIGroup + "/storage.condition.source.running.reason"
144

145
        // AnnVddkVersion shows the last VDDK library version used by a DV's importer pod
146
        AnnVddkVersion = AnnAPIGroup + "/storage.pod.vddk.version"
147
        // AnnVddkHostConnection shows the last ESX host that serviced a DV's importer pod
148
        AnnVddkHostConnection = AnnAPIGroup + "/storage.pod.vddk.host"
149
        // AnnVddkInitImageURL saves a per-DV VDDK image URL on the PVC
150
        AnnVddkInitImageURL = AnnAPIGroup + "/storage.pod.vddk.initimageurl"
151
        // AnnVddkExtraArgs references a ConfigMap that holds arguments to pass directly to the VDDK library
152
        AnnVddkExtraArgs = AnnAPIGroup + "/storage.pod.vddk.extraargs"
153

154
        // AnnRequiresScratch provides a const for our PVC requiring scratch annotation
155
        AnnRequiresScratch = AnnAPIGroup + "/storage.import.requiresScratch"
156

157
        // AnnRequiresDirectIO provides a const for our PVC requiring direct io annotation (due to OOMs we need to try qemu cache=none)
158
        AnnRequiresDirectIO = AnnAPIGroup + "/storage.import.requiresDirectIo"
159
        // OOMKilledReason provides a value that container runtimes must return in the reason field for an OOMKilled container
160
        OOMKilledReason = "OOMKilled"
161

162
        // AnnContentType provides a const for the PVC content-type
163
        AnnContentType = AnnAPIGroup + "/storage.contentType"
164

165
        // AnnSource provide a const for our PVC import source annotation
166
        AnnSource = AnnAPIGroup + "/storage.import.source"
167
        // AnnEndpoint provides a const for our PVC endpoint annotation
168
        AnnEndpoint = AnnAPIGroup + "/storage.import.endpoint"
169

170
        // AnnSecret provides a const for our PVC secretName annotation
171
        AnnSecret = AnnAPIGroup + "/storage.import.secretName"
172
        // AnnCertConfigMap is the name of a configmap containing tls certs
173
        AnnCertConfigMap = AnnAPIGroup + "/storage.import.certConfigMap"
174
        // AnnRegistryImportMethod provides a const for registry import method annotation
175
        AnnRegistryImportMethod = AnnAPIGroup + "/storage.import.registryImportMethod"
176
        // AnnRegistryImageStream provides a const for registry image stream annotation
177
        AnnRegistryImageStream = AnnAPIGroup + "/storage.import.registryImageStream"
178
        // AnnImportPod provides a const for our PVC importPodName annotation
179
        AnnImportPod = AnnAPIGroup + "/storage.import.importPodName"
180
        // AnnDiskID provides a const for our PVC diskId annotation
181
        AnnDiskID = AnnAPIGroup + "/storage.import.diskId"
182
        // AnnUUID provides a const for our PVC uuid annotation
183
        AnnUUID = AnnAPIGroup + "/storage.import.uuid"
184
        // AnnInsecureSkipVerify provides a const for skipping certificate verification
185
        AnnInsecureSkipVerify = AnnAPIGroup + "/storage.import.insecureSkipVerify"
186
        // AnnBackingFile provides a const for our PVC backing file annotation
187
        AnnBackingFile = AnnAPIGroup + "/storage.import.backingFile"
188
        // AnnThumbprint provides a const for our PVC backing thumbprint annotation
189
        AnnThumbprint = AnnAPIGroup + "/storage.import.vddk.thumbprint"
190
        // AnnExtraHeaders provides a const for our PVC extraHeaders annotation
191
        AnnExtraHeaders = AnnAPIGroup + "/storage.import.extraHeaders"
192
        // AnnSecretExtraHeaders provides a const for our PVC secretExtraHeaders annotation
193
        AnnSecretExtraHeaders = AnnAPIGroup + "/storage.import.secretExtraHeaders"
194
        // AnnRegistryImageArchitecture provides a const for our PVC registryImageArchitecture annotation
195
        AnnRegistryImageArchitecture = AnnAPIGroup + "/storage.import.registryImageArchitecture"
196

197
        // AnnCloneToken is the annotation containing the clone token
198
        AnnCloneToken = AnnAPIGroup + "/storage.clone.token"
199
        // AnnExtendedCloneToken is the annotation containing the long term clone token
200
        AnnExtendedCloneToken = AnnAPIGroup + "/storage.extended.clone.token"
201
        // AnnPermissiveClone annotation allows the clone-controller to skip the clone size validation
202
        AnnPermissiveClone = AnnAPIGroup + "/permissiveClone"
203
        // AnnOwnerUID annotation has the owner UID
204
        AnnOwnerUID = AnnAPIGroup + "/ownerUID"
205
        // AnnCloneType is the comuuted/requested clone type
206
        AnnCloneType = AnnAPIGroup + "/cloneType"
207
        // AnnCloneSourcePod name of the source clone pod
208
        AnnCloneSourcePod = AnnAPIGroup + "/storage.sourceClonePodName"
209

210
        // AnnUploadRequest marks that a PVC should be made available for upload
211
        AnnUploadRequest = AnnAPIGroup + "/storage.upload.target"
212

213
        // AnnCheckStaticVolume checks if a statically allocated PV exists before creating the target PVC.
214
        // If so, PVC is still created but population is skipped
215
        AnnCheckStaticVolume = AnnAPIGroup + "/storage.checkStaticVolume"
216

217
        // AnnPersistentVolumeList is an annotation storing a list of PV names
218
        AnnPersistentVolumeList = AnnAPIGroup + "/storage.persistentVolumeList"
219

220
        // AnnPopulatorKind annotation is added to a PVC' to specify the population kind, so it's later
221
        // checked by the common populator watches.
222
        AnnPopulatorKind = AnnAPIGroup + "/storage.populator.kind"
223
        // AnnUsePopulator annotation indicates if the datavolume population will use populators
224
        AnnUsePopulator = AnnAPIGroup + "/storage.usePopulator"
225

226
        // AnnMinimumSupportedPVCSize annotation on a StorageProfile specifies its minimum supported PVC size
227
        AnnMinimumSupportedPVCSize = AnnAPIGroup + "/minimumSupportedPvcSize"
228
        // AnnUseReadWriteOnceForDataImportCron annotation on a StorageProfile signals that DataImportCron should use RWO for DataImportCron PVCs
229
        AnnUseReadWriteOnceForDataImportCron = AnnAPIGroup + "/useReadWriteOnceForDataImportCron"
230
        // AnnSnapshotClassForDataImportCron annotation on a StorageProfile specifies the VolumeSnapshotClass to use for DataImportCron snapshots
231
        AnnSnapshotClassForDataImportCron = AnnAPIGroup + "/snapshotClassForDataImportCron"
232

233
        // AnnDefaultStorageClass is the annotation indicating that a storage class is the default one
234
        AnnDefaultStorageClass = "storageclass.kubernetes.io/is-default-class"
235
        // AnnDefaultVirtStorageClass is the annotation indicating that a storage class is the default one for virtualization purposes
236
        AnnDefaultVirtStorageClass = "storageclass.kubevirt.io/is-default-virt-class"
237
        // AnnDefaultSnapshotClass is the annotation indicating that a snapshot class is the default one
238
        AnnDefaultSnapshotClass = "snapshot.storage.kubernetes.io/is-default-class"
239

240
        // AnnSourceVolumeMode is the volume mode of the source PVC specified as an annotation on snapshots
241
        AnnSourceVolumeMode = AnnAPIGroup + "/storage.import.sourceVolumeMode"
242
        // AnnAdvisedRestoreSize is the advised restore size for disks restored from the snapshot
243
        AnnAdvisedRestoreSize = AnnAPIGroup + "/storage.import.advisedRestoreSize"
244

245
        // AnnOpenShiftImageLookup is the annotation for OpenShift image stream lookup
246
        AnnOpenShiftImageLookup = "alpha.image.policy.openshift.io/resolve-names"
247

248
        // AnnCloneRequest sets our expected annotation for a CloneRequest
249
        AnnCloneRequest = "k8s.io/CloneRequest"
250
        // AnnCloneOf is used to indicate that cloning was complete
251
        AnnCloneOf = "k8s.io/CloneOf"
252

253
        // AnnPodNetwork is used for specifying Pod Network
254
        AnnPodNetwork = "k8s.v1.cni.cncf.io/networks"
255
        // AnnPodMultusDefaultNetwork is used for specifying default Pod Network
256
        AnnPodMultusDefaultNetwork = "v1.multus-cni.io/default-network"
257
        // AnnPodSidecarInjectionIstio is used for enabling/disabling Pod istio/AspenMesh sidecar injection
258
        AnnPodSidecarInjectionIstio = "sidecar.istio.io/inject"
259
        // AnnPodSidecarInjectionIstioDefault is the default value passed for AnnPodSidecarInjection
260
        AnnPodSidecarInjectionIstioDefault = "false"
261
        // AnnPodSidecarInjectionLinkerd is used to enable/disable linkerd sidecar injection
262
        AnnPodSidecarInjectionLinkerd = "linkerd.io/inject"
263
        // AnnPodSidecarInjectionLinkerdDefault is the default value passed for AnnPodSidecarInjectionLinkerd
264
        AnnPodSidecarInjectionLinkerdDefault = "disabled"
265

266
        // AnnImmediateBinding provides a const to indicate whether immediate binding should be performed on the PV (overrides global config)
267
        AnnImmediateBinding = AnnAPIGroup + "/storage.bind.immediate.requested"
268

269
        // AnnSelectedNode annotation is added to a PVC that has been triggered by scheduler to
270
        // be dynamically provisioned. Its value is the name of the selected node.
271
        AnnSelectedNode = "volume.kubernetes.io/selected-node"
272

273
        // CloneUniqueID is used as a special label to be used when we search for the pod
274
        CloneUniqueID = AnnAPIGroup + "/storage.clone.cloneUniqeId"
275

276
        // CloneSourceInUse is reason for event created when clone source pvc is in use
277
        CloneSourceInUse = "CloneSourceInUse"
278

279
        // CloneComplete message
280
        CloneComplete = "Clone Complete"
281

282
        cloneTokenLeeway = 10 * time.Second
283

284
        // Default value for preallocation option if not defined in DV or CDIConfig
285
        defaultPreallocation = false
286

287
        // ErrStartingPod provides a const to indicate that a pod wasn't able to start without providing sensitive information (reason)
288
        ErrStartingPod = "ErrStartingPod"
289
        // MessageErrStartingPod provides a const to indicate that a pod wasn't able to start without providing sensitive information (message)
290
        MessageErrStartingPod = "Error starting pod '%s': For more information, request access to cdi-deploy logs from your sysadmin"
291
        // ErrClaimNotValid provides a const to indicate a claim is not valid
292
        ErrClaimNotValid = "ErrClaimNotValid"
293
        // ErrExceededQuota provides a const to indicate the claim has exceeded the quota
294
        ErrExceededQuota = "ErrExceededQuota"
295
        // ErrIncompatiblePVC provides a const to indicate a clone is not possible due to an incompatible PVC
296
        ErrIncompatiblePVC = "ErrIncompatiblePVC"
297

298
        // SourceHTTP is the source type HTTP, if unspecified or invalid, it defaults to SourceHTTP
299
        SourceHTTP = "http"
300
        // SourceS3 is the source type S3
301
        SourceS3 = "s3"
302
        // SourceGCS is the source type GCS
303
        SourceGCS = "gcs"
304
        // SourceGlance is the source type of glance
305
        SourceGlance = "glance"
306
        // SourceNone means there is no source.
307
        SourceNone = "none"
308
        // SourceRegistry is the source type of Registry
309
        SourceRegistry = "registry"
310
        // SourceImageio is the source type ovirt-imageio
311
        SourceImageio = "imageio"
312
        // SourceVDDK is the source type of VDDK
313
        SourceVDDK = "vddk"
314

315
        // VolumeSnapshotClassSelected reports that a VolumeSnapshotClass was selected
316
        VolumeSnapshotClassSelected = "VolumeSnapshotClassSelected"
317
        // MessageStorageProfileVolumeSnapshotClassSelected reports that a VolumeSnapshotClass was selected according to StorageProfile
318
        MessageStorageProfileVolumeSnapshotClassSelected = "VolumeSnapshotClass selected according to StorageProfile"
319
        // MessageDefaultVolumeSnapshotClassSelected reports that the default VolumeSnapshotClass was selected
320
        MessageDefaultVolumeSnapshotClassSelected = "Default VolumeSnapshotClass selected"
321
        // MessageFirstVolumeSnapshotClassSelected reports that the first VolumeSnapshotClass was selected
322
        MessageFirstVolumeSnapshotClassSelected = "First VolumeSnapshotClass selected"
323

324
        // ClaimLost reason const
325
        ClaimLost = "ClaimLost"
326
        // NotFound reason const
327
        NotFound = "NotFound"
328

329
        // LabelDefaultInstancetype provides a default VirtualMachine{ClusterInstancetype,Instancetype} that can be used by a VirtualMachine booting from a given PVC
330
        LabelDefaultInstancetype = "instancetype.kubevirt.io/default-instancetype"
331
        // LabelDefaultInstancetypeKind provides a default kind of either VirtualMachineClusterInstancetype or VirtualMachineInstancetype
332
        LabelDefaultInstancetypeKind = "instancetype.kubevirt.io/default-instancetype-kind"
333
        // LabelDefaultPreference provides a default VirtualMachine{ClusterPreference,Preference} that can be used by a VirtualMachine booting from a given PVC
334
        LabelDefaultPreference = "instancetype.kubevirt.io/default-preference"
335
        // LabelDefaultPreferenceKind provides a default kind of either VirtualMachineClusterPreference or VirtualMachinePreference
336
        LabelDefaultPreferenceKind = "instancetype.kubevirt.io/default-preference-kind"
337

338
        // LabelDynamicCredentialSupport specifies if the OS supports updating credentials at runtime.
339
        //nolint:gosec // These are not credentials
340
        LabelDynamicCredentialSupport = "kubevirt.io/dynamic-credentials-support"
341

342
        // LabelExcludeFromVeleroBackup provides a const to indicate whether an object should be excluded from velero backup
343
        LabelExcludeFromVeleroBackup = "velero.io/exclude-from-backup"
344

345
        // ProgressDone this means we are DONE
346
        ProgressDone = "100.0%"
347

348
        // AnnEventSourceKind is the source kind that should be related to events
349
        AnnEventSourceKind = AnnAPIGroup + "/events.source.kind"
350
        // AnnEventSource is the source that should be related to events (namespace/name)
351
        AnnEventSource = AnnAPIGroup + "/events.source"
352

353
        // AnnAllowClaimAdoption is the annotation that allows a claim to be adopted by a DataVolume
354
        AnnAllowClaimAdoption = AnnAPIGroup + "/allowClaimAdoption"
355

356
        // AnnCdiCustomizeComponentHash annotation is a hash of all customizations that live under spec.CustomizeComponents
357
        AnnCdiCustomizeComponentHash = AnnAPIGroup + "/customizer-identifier"
358

359
        // AnnCreatedForDataVolume stores the UID of the datavolume that the PVC was created for
360
        AnnCreatedForDataVolume = AnnAPIGroup + "/createdForDataVolume"
361

362
        // AnnPVCPrimeName annotation is the name of the PVC' that is used to populate the PV which is then rebound to the target PVC
363
        AnnPVCPrimeName = AnnAPIGroup + "/storage.populator.pvcPrime"
364
)
365

366
// Size-detection pod error codes
367
const (
368
        NoErr int = iota
369
        ErrBadArguments
370
        ErrInvalidFile
371
        ErrInvalidPath
372
        ErrBadTermFile
373
        ErrUnknown
374
)
375

376
var (
377
        // BlockMode is raw block device mode
378
        BlockMode = corev1.PersistentVolumeBlock
379
        // FilesystemMode is filesystem device mode
380
        FilesystemMode = corev1.PersistentVolumeFilesystem
381

382
        // DefaultInstanceTypeLabels is a list of currently supported default instance type labels
383
        DefaultInstanceTypeLabels = []string{
384
                LabelDefaultInstancetype,
385
                LabelDefaultInstancetypeKind,
386
                LabelDefaultPreference,
387
                LabelDefaultPreferenceKind,
388
        }
389

390
        apiServerKeyOnce sync.Once
391
        apiServerKey     *rsa.PrivateKey
392

393
        // allowedAnnotations is a list of annotations
394
        // that can be propagated from the pvc/dv to a pod
395
        allowedAnnotations = map[string]string{
396
                AnnPodNetwork:                 "",
397
                AnnPodSidecarInjectionIstio:   AnnPodSidecarInjectionIstioDefault,
398
                AnnPodSidecarInjectionLinkerd: AnnPodSidecarInjectionLinkerdDefault,
399
                AnnPriorityClassName:          "",
400
                AnnPodMultusDefaultNetwork:    "",
401
        }
402

403
        validLabelsMatch = regexp.MustCompile(`^([\w.]+\.kubevirt.io|kubevirt.io)/[\w.-]+$`)
404

405
        ErrDataSourceMaxDepthReached = errors.New("DataSource reference chain exceeds maximum depth of 1")
406
        ErrDataSourceSelfReference   = errors.New("DataSource cannot self-reference")
407
        ErrDataSourceCrossNamespace  = errors.New("DataSource cannot reference a DataSource in another namespace")
408
)
409

410
// FakeValidator is a fake token validator
411
type FakeValidator struct {
412
        Match     string
413
        Operation token.Operation
414
        Name      string
415
        Namespace string
416
        Resource  metav1.GroupVersionResource
417
        Params    map[string]string
418
}
419

420
// Validate is a fake token validation
421
func (v *FakeValidator) Validate(value string) (*token.Payload, error) {
×
422
        if value != v.Match {
×
423
                return nil, fmt.Errorf("token does not match expected")
×
424
        }
×
425
        resource := metav1.GroupVersionResource{
×
426
                Resource: "persistentvolumeclaims",
×
427
        }
×
428
        return &token.Payload{
×
429
                Name:      v.Name,
×
430
                Namespace: v.Namespace,
×
431
                Operation: token.OperationClone,
×
432
                Resource:  resource,
×
433
                Params:    v.Params,
×
434
        }, nil
×
435
}
436

437
// MultiTokenValidator is a token validator that can validate both short and long tokens
438
type MultiTokenValidator struct {
439
        ShortTokenValidator token.Validator
440
        LongTokenValidator  token.Validator
441
}
442

443
// ValidatePVC validates a PVC
444
func (mtv *MultiTokenValidator) ValidatePVC(source, target *corev1.PersistentVolumeClaim) error {
×
445
        tok, v := mtv.getTokenAndValidator(target)
×
446
        return ValidateCloneTokenPVC(tok, v, source, target)
×
447
}
×
448

449
// ValidatePopulator valades a token for a populator
450
func (mtv *MultiTokenValidator) ValidatePopulator(vcs *cdiv1.VolumeCloneSource, pvc *corev1.PersistentVolumeClaim) error {
×
451
        if vcs.Namespace == pvc.Namespace {
×
452
                return nil
×
453
        }
×
454

455
        tok, v := mtv.getTokenAndValidator(pvc)
×
456

×
457
        tokenData, err := v.Validate(tok)
×
458
        if err != nil {
×
459
                return errors.Wrap(err, "error verifying token")
×
460
        }
×
461

462
        var tokenResourceName string
×
463
        switch vcs.Spec.Source.Kind {
×
464
        case "PersistentVolumeClaim":
×
465
                tokenResourceName = "persistentvolumeclaims"
×
466
        case "VolumeSnapshot":
×
467
                tokenResourceName = "volumesnapshots"
×
468
        }
469
        srcName := vcs.Spec.Source.Name
×
470

×
471
        return validateTokenData(tokenData, vcs.Namespace, srcName, pvc.Namespace, pvc.Name, string(pvc.UID), tokenResourceName)
×
472
}
473

474
func (mtv *MultiTokenValidator) getTokenAndValidator(pvc *corev1.PersistentVolumeClaim) (string, token.Validator) {
×
475
        v := mtv.LongTokenValidator
×
476
        tok, ok := pvc.Annotations[AnnExtendedCloneToken]
×
477
        if !ok {
×
478
                // if token doesn't exist, no prob for same namespace
×
479
                tok = pvc.Annotations[AnnCloneToken]
×
480
                v = mtv.ShortTokenValidator
×
481
        }
×
482
        return tok, v
×
483
}
484

485
// NewMultiTokenValidator returns a new multi token validator
486
func NewMultiTokenValidator(key *rsa.PublicKey) *MultiTokenValidator {
×
487
        return &MultiTokenValidator{
×
488
                ShortTokenValidator: NewCloneTokenValidator(common.CloneTokenIssuer, key),
×
489
                LongTokenValidator:  NewCloneTokenValidator(common.ExtendedCloneTokenIssuer, key),
×
490
        }
×
491
}
×
492

493
// NewCloneTokenValidator returns a new token validator
494
func NewCloneTokenValidator(issuer string, key *rsa.PublicKey) token.Validator {
×
495
        return token.NewValidator(issuer, key, cloneTokenLeeway)
×
496
}
×
497

498
// GetRequestedImageSize returns the PVC requested size
499
func GetRequestedImageSize(pvc *corev1.PersistentVolumeClaim) (string, error) {
1✔
500
        pvcSize, found := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
1✔
501
        if !found {
2✔
502
                return "", errors.Errorf("storage request is missing in pvc \"%s/%s\"", pvc.Namespace, pvc.Name)
1✔
503
        }
1✔
504
        return pvcSize.String(), nil
1✔
505
}
506

507
// GetVolumeMode returns the volumeMode from PVC handling default empty value
508
func GetVolumeMode(pvc *corev1.PersistentVolumeClaim) corev1.PersistentVolumeMode {
×
509
        return util.ResolveVolumeMode(pvc.Spec.VolumeMode)
×
510
}
×
511

512
// IsDataVolumeUsingDefaultStorageClass checks if the DataVolume is using the default StorageClass
513
func IsDataVolumeUsingDefaultStorageClass(dv *cdiv1.DataVolume) bool {
×
514
        return GetStorageClassFromDVSpec(dv) == nil
×
515
}
×
516

517
// GetStorageClassFromDVSpec returns the StorageClassName from DataVolume PVC or Storage spec
518
func GetStorageClassFromDVSpec(dv *cdiv1.DataVolume) *string {
×
519
        if dv.Spec.PVC != nil {
×
520
                return dv.Spec.PVC.StorageClassName
×
521
        }
×
522

523
        if dv.Spec.Storage != nil {
×
524
                return dv.Spec.Storage.StorageClassName
×
525
        }
×
526

527
        return nil
×
528
}
529

530
// getStorageClassByName looks up the storage class based on the name.
531
// If name is nil, it performs fallback to default according to the provided content type
532
// If no storage class is found, returns nil
533
func getStorageClassByName(ctx context.Context, client client.Client, name *string, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
1✔
534
        if name == nil {
2✔
535
                return getFallbackStorageClass(ctx, client, contentType)
1✔
536
        }
1✔
537

538
        // look up storage class by name
539
        storageClass := &storagev1.StorageClass{}
×
540
        if err := client.Get(ctx, types.NamespacedName{Name: *name}, storageClass); err != nil {
×
541
                if k8serrors.IsNotFound(err) {
×
542
                        return nil, nil
×
543
                }
×
544
                klog.V(3).Info("Unable to retrieve storage class", "storage class name", *name)
×
545
                return nil, errors.Errorf("unable to retrieve storage class %s", *name)
×
546
        }
547

548
        return storageClass, nil
×
549
}
550

551
// GetStorageClassByNameWithK8sFallback looks up the storage class based on the name
552
// If name is nil, it looks for the default k8s storage class storageclass.kubernetes.io/is-default-class
553
// If no storage class is found, returns nil
554
func GetStorageClassByNameWithK8sFallback(ctx context.Context, client client.Client, name *string) (*storagev1.StorageClass, error) {
1✔
555
        return getStorageClassByName(ctx, client, name, cdiv1.DataVolumeArchive)
1✔
556
}
1✔
557

558
// GetStorageClassByNameWithVirtFallback looks up the storage class based on the name
559
// If name is nil, it looks for the following, in this order:
560
// default kubevirt storage class (if the caller is interested) storageclass.kubevirt.io/is-default-class
561
// default k8s storage class storageclass.kubernetes.io/is-default-class
562
// If no storage class is found, returns nil
563
func GetStorageClassByNameWithVirtFallback(ctx context.Context, client client.Client, name *string, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
1✔
564
        return getStorageClassByName(ctx, client, name, contentType)
1✔
565
}
1✔
566

567
// getFallbackStorageClass looks for a default virt/k8s storage class according to the content type
568
// If no storage class is found, returns nil
569
func getFallbackStorageClass(ctx context.Context, client client.Client, contentType cdiv1.DataVolumeContentType) (*storagev1.StorageClass, error) {
1✔
570
        storageClasses := &storagev1.StorageClassList{}
1✔
571
        if err := client.List(ctx, storageClasses); err != nil {
1✔
572
                klog.V(3).Info("Unable to retrieve available storage classes")
×
573
                return nil, errors.New("unable to retrieve storage classes")
×
574
        }
×
575

576
        if GetContentType(contentType) == cdiv1.DataVolumeKubeVirt {
2✔
577
                if virtSc := GetPlatformDefaultStorageClass(storageClasses, AnnDefaultVirtStorageClass); virtSc != nil {
2✔
578
                        return virtSc, nil
1✔
579
                }
1✔
580
        }
581
        return GetPlatformDefaultStorageClass(storageClasses, AnnDefaultStorageClass), nil
1✔
582
}
583

584
// GetPlatformDefaultStorageClass returns the default storage class according to the provided annotation or nil if none found
585
func GetPlatformDefaultStorageClass(storageClasses *storagev1.StorageClassList, defaultAnnotationKey string) *storagev1.StorageClass {
1✔
586
        defaultClasses := []storagev1.StorageClass{}
1✔
587

1✔
588
        for _, storageClass := range storageClasses.Items {
2✔
589
                if storageClass.Annotations[defaultAnnotationKey] == "true" {
2✔
590
                        defaultClasses = append(defaultClasses, storageClass)
1✔
591
                }
1✔
592
        }
593

594
        if len(defaultClasses) == 0 {
2✔
595
                return nil
1✔
596
        }
1✔
597

598
        // Primary sort by creation timestamp, newest first
599
        // Secondary sort by class name, ascending order
600
        // Follows k8s behavior
601
        // https://github.com/kubernetes/kubernetes/blob/731068288e112c8b5af70f676296cc44661e84f4/pkg/volume/util/storageclass.go#L58-L59
602
        sort.Slice(defaultClasses, func(i, j int) bool {
2✔
603
                if defaultClasses[i].CreationTimestamp.UnixNano() == defaultClasses[j].CreationTimestamp.UnixNano() {
2✔
604
                        return defaultClasses[i].Name < defaultClasses[j].Name
1✔
605
                }
1✔
606
                return defaultClasses[i].CreationTimestamp.UnixNano() > defaultClasses[j].CreationTimestamp.UnixNano()
1✔
607
        })
608
        if len(defaultClasses) > 1 {
2✔
609
                klog.V(3).Infof("%d default StorageClasses were found, choosing: %s", len(defaultClasses), defaultClasses[0].Name)
1✔
610
        }
1✔
611

612
        return &defaultClasses[0]
1✔
613
}
614

615
// GetFilesystemOverheadForStorageClass determines the filesystem overhead defined in CDIConfig for the storageClass.
616
func GetFilesystemOverheadForStorageClass(ctx context.Context, client client.Client, storageClassName *string) (cdiv1.Percent, error) {
×
617
        if storageClassName != nil && *storageClassName == "" {
×
618
                klog.V(3).Info("No storage class name passed")
×
619
                return "0", nil
×
620
        }
×
621

622
        cdiConfig := &cdiv1.CDIConfig{}
×
623
        if err := client.Get(ctx, types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
×
624
                if k8serrors.IsNotFound(err) {
×
625
                        klog.V(1).Info("CDIConfig does not exist, pod will not start until it does")
×
626
                        return "0", nil
×
627
                }
×
628
                return "0", err
×
629
        }
630

631
        targetStorageClass, err := GetStorageClassByNameWithK8sFallback(ctx, client, storageClassName)
×
632
        if err != nil || targetStorageClass == nil {
×
633
                klog.V(3).Info("Storage class", storageClassName, "not found, trying default storage class")
×
634
                targetStorageClass, err = GetStorageClassByNameWithK8sFallback(ctx, client, nil)
×
635
                if err != nil {
×
636
                        klog.V(3).Info("No default storage class found, continuing with global overhead")
×
637
                        return cdiConfig.Status.FilesystemOverhead.Global, nil
×
638
                }
×
639
        }
640

641
        if cdiConfig.Status.FilesystemOverhead == nil {
×
642
                klog.Errorf("CDIConfig filesystemOverhead used before config controller ran reconcile. Hopefully this only happens during unit testing.")
×
643
                return "0", nil
×
644
        }
×
645

646
        if targetStorageClass == nil {
×
647
                klog.V(3).Info("Storage class", storageClassName, "not found, continuing with global overhead")
×
648
                return cdiConfig.Status.FilesystemOverhead.Global, nil
×
649
        }
×
650

651
        klog.V(3).Info("target storage class for overhead", targetStorageClass.GetName())
×
652

×
653
        perStorageConfig := cdiConfig.Status.FilesystemOverhead.StorageClass
×
654

×
655
        storageClassOverhead, found := perStorageConfig[targetStorageClass.GetName()]
×
656
        if found {
×
657
                return storageClassOverhead, nil
×
658
        }
×
659

660
        return cdiConfig.Status.FilesystemOverhead.Global, nil
×
661
}
662

663
// GetDefaultPodResourceRequirements gets default pod resource requirements from cdi config status
664
func GetDefaultPodResourceRequirements(client client.Client) (*corev1.ResourceRequirements, error) {
×
665
        cdiconfig := &cdiv1.CDIConfig{}
×
666
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
×
667
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
668
                return nil, err
×
669
        }
×
670

671
        return cdiconfig.Status.DefaultPodResourceRequirements, nil
×
672
}
673

674
// GetImagePullSecrets gets the imagePullSecrets needed to pull images from the cdi config
675
func GetImagePullSecrets(client client.Client) ([]corev1.LocalObjectReference, error) {
×
676
        cdiconfig := &cdiv1.CDIConfig{}
×
677
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
×
678
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
679
                return nil, err
×
680
        }
×
681

682
        return cdiconfig.Status.ImagePullSecrets, nil
×
683
}
684

685
// GetPodFromPvc determines the pod associated with the pvc passed in.
686
func GetPodFromPvc(c client.Client, namespace string, pvc *corev1.PersistentVolumeClaim) (*corev1.Pod, error) {
×
687
        l, _ := labels.Parse(common.PrometheusLabelKey)
×
688
        pods := &corev1.PodList{}
×
689
        listOptions := client.ListOptions{
×
690
                LabelSelector: l,
×
691
        }
×
692
        if err := c.List(context.TODO(), pods, &listOptions); err != nil {
×
693
                return nil, err
×
694
        }
×
695

696
        pvcUID := pvc.GetUID()
×
697
        for _, pod := range pods.Items {
×
698
                if ShouldIgnorePod(&pod, pvc) {
×
699
                        continue
×
700
                }
701
                for _, or := range pod.OwnerReferences {
×
702
                        if or.UID == pvcUID {
×
703
                                return &pod, nil
×
704
                        }
×
705
                }
706

707
                // TODO: check this
708
                val, exists := pod.Labels[CloneUniqueID]
×
709
                if exists && val == string(pvcUID)+common.ClonerSourcePodNameSuffix {
×
710
                        return &pod, nil
×
711
                }
×
712
        }
713
        return nil, errors.Errorf("Unable to find pod owned by UID: %s, in namespace: %s", string(pvcUID), namespace)
×
714
}
715

716
// AddVolumeDevices returns VolumeDevice slice with one block device for pods using PV with block volume mode
717
func AddVolumeDevices() []corev1.VolumeDevice {
×
718
        volumeDevices := []corev1.VolumeDevice{
×
719
                {
×
720
                        Name:       DataVolName,
×
721
                        DevicePath: common.WriteBlockPath,
×
722
                },
×
723
        }
×
724
        return volumeDevices
×
725
}
×
726

727
// GetPodsUsingPVCs returns Pods currently using PVCs
728
func GetPodsUsingPVCs(ctx context.Context, c client.Client, namespace string, names sets.Set[string], allowReadOnly bool) ([]corev1.Pod, error) {
×
729
        pl := &corev1.PodList{}
×
730
        // hopefully using cached client here
×
731
        err := c.List(ctx, pl, &client.ListOptions{Namespace: namespace})
×
732
        if err != nil {
×
733
                return nil, err
×
734
        }
×
735

736
        var pods []corev1.Pod
×
737
        for _, pod := range pl.Items {
×
738
                if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
×
739
                        continue
×
740
                }
741
                for _, volume := range pod.Spec.Volumes {
×
742
                        if volume.VolumeSource.PersistentVolumeClaim != nil &&
×
743
                                names.Has(volume.PersistentVolumeClaim.ClaimName) {
×
744
                                addPod := true
×
745
                                if allowReadOnly {
×
746
                                        if !volume.VolumeSource.PersistentVolumeClaim.ReadOnly {
×
747
                                                onlyReadOnly := true
×
748
                                                for _, c := range pod.Spec.Containers {
×
749
                                                        for _, vm := range c.VolumeMounts {
×
750
                                                                if vm.Name == volume.Name && !vm.ReadOnly {
×
751
                                                                        onlyReadOnly = false
×
752
                                                                }
×
753
                                                        }
754
                                                        for _, vm := range c.VolumeDevices {
×
755
                                                                if vm.Name == volume.Name {
×
756
                                                                        // Node level rw mount and container can't mount block device ro
×
757
                                                                        onlyReadOnly = false
×
758
                                                                }
×
759
                                                        }
760
                                                }
761
                                                if onlyReadOnly {
×
762
                                                        // no rw mounts
×
763
                                                        addPod = false
×
764
                                                }
×
765
                                        } else {
×
766
                                                // all mounts must be ro
×
767
                                                addPod = false
×
768
                                        }
×
769
                                        if strings.HasSuffix(pod.Name, common.ClonerSourcePodNameSuffix) && pod.Labels != nil &&
×
770
                                                pod.Labels[common.CDIComponentLabel] == common.ClonerSourcePodName {
×
771
                                                // Host assisted clone source pod only reads from source
×
772
                                                // But some drivers disallow mounting a block PVC ReadOnly
×
773
                                                addPod = false
×
774
                                        }
×
775
                                }
776
                                if addPod {
×
777
                                        pods = append(pods, pod)
×
778
                                        break
×
779
                                }
780
                        }
781
                }
782
        }
783

784
        return pods, nil
×
785
}
786

787
// GetWorkloadNodePlacement extracts the workload-specific nodeplacement values from the CDI CR
788
func GetWorkloadNodePlacement(ctx context.Context, c client.Client) (*sdkapi.NodePlacement, error) {
×
789
        cr, err := GetActiveCDI(ctx, c)
×
790
        if err != nil {
×
791
                return nil, err
×
792
        }
×
793

794
        if cr == nil {
×
795
                return nil, fmt.Errorf("no active CDI")
×
796
        }
×
797

798
        return &cr.Spec.Workloads, nil
×
799
}
800

801
// GetActiveCDI returns the active CDI CR
802
func GetActiveCDI(ctx context.Context, c client.Client) (*cdiv1.CDI, error) {
1✔
803
        crList := &cdiv1.CDIList{}
1✔
804
        if err := c.List(ctx, crList, &client.ListOptions{}); err != nil {
1✔
805
                return nil, err
×
806
        }
×
807

808
        if len(crList.Items) == 0 {
2✔
809
                return nil, nil
1✔
810
        }
1✔
811

812
        if len(crList.Items) == 1 {
2✔
813
                return &crList.Items[0], nil
1✔
814
        }
1✔
815

816
        var activeResources []cdiv1.CDI
1✔
817
        for _, cr := range crList.Items {
2✔
818
                if cr.Status.Phase != sdkapi.PhaseError {
2✔
819
                        activeResources = append(activeResources, cr)
1✔
820
                }
1✔
821
        }
822

823
        if len(activeResources) != 1 {
2✔
824
                return nil, fmt.Errorf("invalid number of active CDI resources: %d", len(activeResources))
1✔
825
        }
1✔
826

827
        return &activeResources[0], nil
1✔
828
}
829

830
// IsPopulated returns if the passed in PVC has been populated according to the rules outlined in pkg/apis/core/<version>/utils.go
831
func IsPopulated(pvc *corev1.PersistentVolumeClaim, c client.Client) (bool, error) {
×
832
        return cdiv1utils.IsPopulated(pvc, func(name, namespace string) (*cdiv1.DataVolume, error) {
×
833
                dv := &cdiv1.DataVolume{}
×
834
                err := c.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, dv)
×
835
                return dv, err
×
836
        })
×
837
}
838

839
// GetPreallocation returns the preallocation setting for the specified object (DV or VolumeImportSource), falling back to StorageClass and global setting (in this order)
840
func GetPreallocation(ctx context.Context, client client.Client, preallocation *bool) bool {
×
841
        // First, the DV's preallocation
×
842
        if preallocation != nil {
×
843
                return *preallocation
×
844
        }
×
845

846
        cdiconfig := &cdiv1.CDIConfig{}
×
847
        if err := client.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiconfig); err != nil {
×
848
                klog.Errorf("Unable to find CDI configuration, %v\n", err)
×
849
                return defaultPreallocation
×
850
        }
×
851

852
        return cdiconfig.Status.Preallocation
×
853
}
854

855
// ImmediateBindingRequested returns if an object has the ImmediateBinding annotation
856
func ImmediateBindingRequested(obj metav1.Object) bool {
×
857
        _, isImmediateBindingRequested := obj.GetAnnotations()[AnnImmediateBinding]
×
858
        return isImmediateBindingRequested
×
859
}
×
860

861
// GetPriorityClass gets PVC priority class
862
func GetPriorityClass(pvc *corev1.PersistentVolumeClaim) string {
×
863
        anno := pvc.GetAnnotations()
×
864
        return anno[AnnPriorityClassName]
×
865
}
×
866

867
// ShouldDeletePod returns whether the PVC workload pod should be deleted
868
func ShouldDeletePod(pvc *corev1.PersistentVolumeClaim) bool {
×
869
        return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.GetAnnotations()[AnnRequiresDirectIO] == "true" || pvc.DeletionTimestamp != nil
×
870
}
×
871

872
// AddFinalizer adds a finalizer to a resource
873
func AddFinalizer(obj metav1.Object, name string) {
×
874
        if HasFinalizer(obj, name) {
×
875
                return
×
876
        }
×
877

878
        obj.SetFinalizers(append(obj.GetFinalizers(), name))
×
879
}
880

881
// RemoveFinalizer removes a finalizer from a resource
882
func RemoveFinalizer(obj metav1.Object, name string) {
×
883
        if !HasFinalizer(obj, name) {
×
884
                return
×
885
        }
×
886

887
        var finalizers []string
×
888
        for _, f := range obj.GetFinalizers() {
×
889
                if f != name {
×
890
                        finalizers = append(finalizers, f)
×
891
                }
×
892
        }
893

894
        obj.SetFinalizers(finalizers)
×
895
}
896

897
// HasFinalizer returns true if a resource has a specific finalizer
898
func HasFinalizer(object metav1.Object, value string) bool {
×
899
        for _, f := range object.GetFinalizers() {
×
900
                if f == value {
×
901
                        return true
×
902
                }
×
903
        }
904
        return false
×
905
}
906

907
// ValidateCloneTokenPVC validates clone token for source and target PVCs
908
func ValidateCloneTokenPVC(t string, v token.Validator, source, target *corev1.PersistentVolumeClaim) error {
×
909
        if source.Namespace == target.Namespace {
×
910
                return nil
×
911
        }
×
912

913
        tokenData, err := v.Validate(t)
×
914
        if err != nil {
×
915
                return errors.Wrap(err, "error verifying token")
×
916
        }
×
917

918
        tokenResourceName := getTokenResourceNamePvc(source)
×
919
        srcName := getSourceNamePvc(source)
×
920

×
921
        return validateTokenData(tokenData, source.Namespace, srcName, target.Namespace, target.Name, string(target.UID), tokenResourceName)
×
922
}
923

924
// ValidateCloneTokenDV validates clone token for DV
925
func ValidateCloneTokenDV(validator token.Validator, dv *cdiv1.DataVolume) error {
×
926
        _, sourceName, sourceNamespace := GetCloneSourceInfo(dv)
×
927
        if sourceNamespace == "" || sourceNamespace == dv.Namespace {
×
928
                return nil
×
929
        }
×
930

931
        tok, ok := dv.Annotations[AnnCloneToken]
×
932
        if !ok {
×
933
                return errors.New("clone token missing")
×
934
        }
×
935

936
        tokenData, err := validator.Validate(tok)
×
937
        if err != nil {
×
938
                return errors.Wrap(err, "error verifying token")
×
939
        }
×
940

941
        tokenResourceName := getTokenResourceNameDataVolume(dv.Spec.Source)
×
942
        if tokenResourceName == "" {
×
943
                return errors.New("token resource name empty, can't verify properly")
×
944
        }
×
945

946
        return validateTokenData(tokenData, sourceNamespace, sourceName, dv.Namespace, dv.Name, "", tokenResourceName)
×
947
}
948

949
func getTokenResourceNameDataVolume(source *cdiv1.DataVolumeSource) string {
×
950
        if source.PVC != nil {
×
951
                return "persistentvolumeclaims"
×
952
        } else if source.Snapshot != nil {
×
953
                return "volumesnapshots"
×
954
        }
×
955

956
        return ""
×
957
}
958

959
func getTokenResourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
×
960
        if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
×
961
                return "volumesnapshots"
×
962
        }
×
963

964
        return "persistentvolumeclaims"
×
965
}
966

967
func getSourceNamePvc(sourcePvc *corev1.PersistentVolumeClaim) string {
×
968
        if v, ok := sourcePvc.Labels[common.CDIComponentLabel]; ok && v == common.CloneFromSnapshotFallbackPVCCDILabel {
×
969
                if sourcePvc.Spec.DataSourceRef != nil {
×
970
                        return sourcePvc.Spec.DataSourceRef.Name
×
971
                }
×
972
        }
973

974
        return sourcePvc.Name
×
975
}
976

977
func validateTokenData(tokenData *token.Payload, srcNamespace, srcName, targetNamespace, targetName, targetUID, tokenResourceName string) error {
×
978
        uid := tokenData.Params["uid"]
×
979
        if tokenData.Operation != token.OperationClone ||
×
980
                tokenData.Name != srcName ||
×
981
                tokenData.Namespace != srcNamespace ||
×
982
                tokenData.Resource.Resource != tokenResourceName ||
×
983
                tokenData.Params["targetNamespace"] != targetNamespace ||
×
984
                tokenData.Params["targetName"] != targetName ||
×
985
                (uid != "" && uid != targetUID) {
×
986
                return errors.New("invalid token")
×
987
        }
×
988

989
        return nil
×
990
}
991

992
// IsSnapshotValidForClone returns an error if the passed snapshot is not valid for cloning
993
func IsSnapshotValidForClone(sourceSnapshot *snapshotv1.VolumeSnapshot) error {
×
994
        if sourceSnapshot.Status == nil {
×
995
                return fmt.Errorf("no status on source snapshot yet")
×
996
        }
×
997
        if !IsSnapshotReady(sourceSnapshot) {
×
998
                klog.V(3).Info("snapshot not ReadyToUse, while we allow this, probably going to be an issue going forward", "namespace", sourceSnapshot.Namespace, "name", sourceSnapshot.Name)
×
999
        }
×
1000
        if sourceSnapshot.Status.Error != nil {
×
1001
                errMessage := "no details"
×
1002
                if msg := sourceSnapshot.Status.Error.Message; msg != nil {
×
1003
                        errMessage = *msg
×
1004
                }
×
1005
                return fmt.Errorf("snapshot in error state with msg: %s", errMessage)
×
1006
        }
1007
        if sourceSnapshot.Spec.VolumeSnapshotClassName == nil ||
×
1008
                *sourceSnapshot.Spec.VolumeSnapshotClassName == "" {
×
1009
                return fmt.Errorf("snapshot %s/%s does not have volume snap class populated, can't clone", sourceSnapshot.Name, sourceSnapshot.Namespace)
×
1010
        }
×
1011
        return nil
×
1012
}
1013

1014
// AddAnnotation adds an annotation to an object
1015
func AddAnnotation(obj metav1.Object, key, value string) {
1✔
1016
        if obj.GetAnnotations() == nil {
2✔
1017
                obj.SetAnnotations(make(map[string]string))
1✔
1018
        }
1✔
1019
        obj.GetAnnotations()[key] = value
1✔
1020
}
1021

1022
// AddLabel adds a label to an object
1023
func AddLabel(obj metav1.Object, key, value string) {
1✔
1024
        if obj.GetLabels() == nil {
2✔
1025
                obj.SetLabels(make(map[string]string))
1✔
1026
        }
1✔
1027
        obj.GetLabels()[key] = value
1✔
1028
}
1029

1030
// HandleFailedPod handles pod-creation errors and updates the pod's PVC without providing sensitive information
1031
func HandleFailedPod(err error, podName string, pvc *corev1.PersistentVolumeClaim, recorder record.EventRecorder, c client.Client) error {
×
1032
        if err == nil {
×
1033
                return nil
×
1034
        }
×
1035
        // Generic reason and msg to avoid providing sensitive information
1036
        reason := ErrStartingPod
×
1037
        msg := fmt.Sprintf(MessageErrStartingPod, podName)
×
1038

×
1039
        // Error handling to fine-tune the event with pertinent info
×
1040
        if ErrQuotaExceeded(err) {
×
1041
                reason = ErrExceededQuota
×
1042
        }
×
1043

1044
        recorder.Event(pvc, corev1.EventTypeWarning, reason, msg)
×
1045

×
1046
        if isCloneSourcePod := CreateCloneSourcePodName(pvc) == podName; isCloneSourcePod {
×
1047
                AddAnnotation(pvc, AnnSourceRunningCondition, "false")
×
1048
                AddAnnotation(pvc, AnnSourceRunningConditionReason, reason)
×
1049
                AddAnnotation(pvc, AnnSourceRunningConditionMessage, msg)
×
1050
        } else {
×
1051
                AddAnnotation(pvc, AnnRunningCondition, "false")
×
1052
                AddAnnotation(pvc, AnnRunningConditionReason, reason)
×
1053
                AddAnnotation(pvc, AnnRunningConditionMessage, msg)
×
1054
        }
×
1055

1056
        AddAnnotation(pvc, AnnPodPhase, string(corev1.PodFailed))
×
1057
        if err := c.Update(context.TODO(), pvc); err != nil {
×
1058
                return err
×
1059
        }
×
1060

1061
        return err
×
1062
}
1063

1064
// GetSource returns the source string which determines the type of source. If no source or invalid source found, default to http
1065
func GetSource(pvc *corev1.PersistentVolumeClaim) string {
×
1066
        source, found := pvc.Annotations[AnnSource]
×
1067
        if !found {
×
1068
                source = ""
×
1069
        }
×
1070
        switch source {
×
1071
        case
1072
                SourceHTTP,
1073
                SourceS3,
1074
                SourceGCS,
1075
                SourceGlance,
1076
                SourceNone,
1077
                SourceRegistry,
1078
                SourceImageio,
1079
                SourceVDDK:
×
1080
        default:
×
1081
                source = SourceHTTP
×
1082
        }
1083
        return source
×
1084
}
1085

1086
// GetEndpoint returns the endpoint string which contains the full path URI of the target object to be copied.
1087
func GetEndpoint(pvc *corev1.PersistentVolumeClaim) (string, error) {
×
1088
        ep, found := pvc.Annotations[AnnEndpoint]
×
1089
        if !found || ep == "" {
×
1090
                verb := "empty"
×
1091
                if !found {
×
1092
                        verb = "missing"
×
1093
                }
×
1094
                return ep, errors.Errorf("annotation %q in pvc \"%s/%s\" is %s", AnnEndpoint, pvc.Namespace, pvc.Name, verb)
×
1095
        }
1096
        return ep, nil
×
1097
}
1098

1099
// AddImportVolumeMounts is being called for pods using PV with filesystem volume mode
1100
func AddImportVolumeMounts() []corev1.VolumeMount {
×
1101
        volumeMounts := []corev1.VolumeMount{
×
1102
                {
×
1103
                        Name:      DataVolName,
×
1104
                        MountPath: common.ImporterDataDir,
×
1105
                },
×
1106
        }
×
1107
        return volumeMounts
×
1108
}
×
1109

1110
// GetEffectiveStorageResources returns the maximum of the passed storageResources and the storageProfile minimumSupportedPVCSize.
1111
// If the passed storageResources has no size, it is returned as-is.
1112
func GetEffectiveStorageResources(ctx context.Context, client client.Client, storageResources corev1.VolumeResourceRequirements,
1113
        storageClassName *string, contentType cdiv1.DataVolumeContentType, log logr.Logger) (corev1.VolumeResourceRequirements, error) {
×
1114
        sc, err := GetStorageClassByNameWithVirtFallback(ctx, client, storageClassName, contentType)
×
1115
        if err != nil || sc == nil {
×
1116
                return storageResources, err
×
1117
        }
×
1118

1119
        requestedSize, hasSize := storageResources.Requests[corev1.ResourceStorage]
×
1120
        if !hasSize {
×
1121
                return storageResources, nil
×
1122
        }
×
1123

1124
        if requestedSize, err = GetEffectiveVolumeSize(ctx, client, requestedSize, sc.Name, &log); err != nil {
×
1125
                return storageResources, err
×
1126
        }
×
1127

1128
        return corev1.VolumeResourceRequirements{
×
1129
                Requests: corev1.ResourceList{
×
1130
                        corev1.ResourceStorage: requestedSize,
×
1131
                },
×
1132
        }, nil
×
1133
}
1134

1135
// GetEffectiveVolumeSize returns the maximum of the passed requestedSize and the storageProfile minimumSupportedPVCSize.
1136
func GetEffectiveVolumeSize(ctx context.Context, client client.Client, requestedSize resource.Quantity, storageClassName string, log *logr.Logger) (resource.Quantity, error) {
×
1137
        storageProfile := &cdiv1.StorageProfile{}
×
1138
        if err := client.Get(ctx, types.NamespacedName{Name: storageClassName}, storageProfile); err != nil {
×
1139
                return requestedSize, IgnoreNotFound(err)
×
1140
        }
×
1141

1142
        if val, exists := storageProfile.Annotations[AnnMinimumSupportedPVCSize]; exists {
×
1143
                if minSize, err := resource.ParseQuantity(val); err == nil {
×
1144
                        if requestedSize.Cmp(minSize) == -1 {
×
1145
                                return minSize, nil
×
1146
                        }
×
1147
                } else if log != nil {
×
1148
                        log.V(1).Info("Invalid minimum PVC size in annotation", "value", val, "error", err)
×
1149
                }
×
1150
        }
1151

1152
        return requestedSize, nil
×
1153
}
1154

1155
// ValidateRequestedCloneSize validates the clone size requirements on block
1156
func ValidateRequestedCloneSize(sourceResources, targetResources corev1.VolumeResourceRequirements) error {
×
1157
        sourceRequest, hasSource := sourceResources.Requests[corev1.ResourceStorage]
×
1158
        targetRequest, hasTarget := targetResources.Requests[corev1.ResourceStorage]
×
1159
        if !hasSource || !hasTarget {
×
1160
                return errors.New("source/target missing storage resource requests")
×
1161
        }
×
1162

1163
        // Verify that the target PVC size is equal or larger than the source.
1164
        if sourceRequest.Value() > targetRequest.Value() {
×
1165
                return errors.Errorf("target resources requests storage size is smaller than the source %d < %d", targetRequest.Value(), sourceRequest.Value())
×
1166
        }
×
1167
        return nil
×
1168
}
1169

1170
// CreateCloneSourcePodName creates clone source pod name
1171
func CreateCloneSourcePodName(targetPvc *corev1.PersistentVolumeClaim) string {
×
1172
        return string(targetPvc.GetUID()) + common.ClonerSourcePodNameSuffix
×
1173
}
×
1174

1175
// IsPVCComplete returns true if a PVC is in 'Succeeded' phase, false if not
1176
func IsPVCComplete(pvc *corev1.PersistentVolumeClaim) bool {
×
1177
        if pvc != nil {
×
1178
                phase, exists := pvc.ObjectMeta.Annotations[AnnPodPhase]
×
1179
                return exists && (phase == string(corev1.PodSucceeded))
×
1180
        }
×
1181
        return false
×
1182
}
1183

1184
// IsMultiStageImportInProgress returns true when a PVC is being part of an ongoing multi-stage import
1185
func IsMultiStageImportInProgress(pvc *corev1.PersistentVolumeClaim) bool {
×
1186
        if pvc != nil {
×
1187
                multiStageImport := metav1.HasAnnotation(pvc.ObjectMeta, AnnCurrentCheckpoint)
×
1188
                multiStageAlreadyDone := metav1.HasAnnotation(pvc.ObjectMeta, AnnMultiStageImportDone)
×
1189
                return multiStageImport && !multiStageAlreadyDone
×
1190
        }
×
1191
        return false
×
1192
}
1193

1194
// SetRestrictedSecurityContext sets the pod security params to be compatible with restricted PSA
1195
func SetRestrictedSecurityContext(podSpec *corev1.PodSpec) {
×
1196
        hasVolumeMounts := false
×
1197
        for _, containers := range [][]corev1.Container{podSpec.InitContainers, podSpec.Containers} {
×
1198
                for i := range containers {
×
1199
                        container := &containers[i]
×
1200
                        if container.SecurityContext == nil {
×
1201
                                container.SecurityContext = &corev1.SecurityContext{}
×
1202
                        }
×
1203
                        container.SecurityContext.Capabilities = &corev1.Capabilities{
×
1204
                                Drop: []corev1.Capability{
×
1205
                                        "ALL",
×
1206
                                },
×
1207
                        }
×
1208
                        container.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
×
1209
                                Type: corev1.SeccompProfileTypeRuntimeDefault,
×
1210
                        }
×
1211
                        container.SecurityContext.AllowPrivilegeEscalation = ptr.To[bool](false)
×
1212
                        container.SecurityContext.RunAsNonRoot = ptr.To[bool](true)
×
1213
                        container.SecurityContext.RunAsUser = ptr.To[int64](common.QemuSubGid)
×
1214
                        if len(container.VolumeMounts) > 0 {
×
1215
                                hasVolumeMounts = true
×
1216
                        }
×
1217
                }
1218
        }
1219

1220
        if podSpec.SecurityContext == nil {
×
1221
                podSpec.SecurityContext = &corev1.PodSecurityContext{}
×
1222
        }
×
1223
        // Some tools like istio inject containers and thus rely on a pod level seccomp profile being specified
1224
        podSpec.SecurityContext.SeccompProfile = &corev1.SeccompProfile{
×
1225
                Type: corev1.SeccompProfileTypeRuntimeDefault,
×
1226
        }
×
1227
        if hasVolumeMounts {
×
1228
                podSpec.SecurityContext.FSGroup = ptr.To[int64](common.QemuSubGid)
×
1229
        }
×
1230
}
1231

1232
// SetNodeNameIfPopulator sets NodeName in a pod spec when the PVC is being handled by a CDI volume populator
1233
func SetNodeNameIfPopulator(pvc *corev1.PersistentVolumeClaim, podSpec *corev1.PodSpec) {
×
1234
        _, isPopulator := pvc.Annotations[AnnPopulatorKind]
×
1235
        nodeName := pvc.Annotations[AnnSelectedNode]
×
1236
        if isPopulator && nodeName != "" {
×
1237
                podSpec.NodeName = nodeName
×
1238
        }
×
1239
}
1240

1241
// CreatePvc creates PVC
1242
func CreatePvc(name, ns string, annotations, labels map[string]string) *corev1.PersistentVolumeClaim {
1✔
1243
        return CreatePvcInStorageClass(name, ns, nil, annotations, labels, corev1.ClaimBound)
1✔
1244
}
1✔
1245

1246
// CreatePvcInStorageClass creates PVC with storgae class
1247
func CreatePvcInStorageClass(name, ns string, storageClassName *string, annotations, labels map[string]string, phase corev1.PersistentVolumeClaimPhase) *corev1.PersistentVolumeClaim {
1✔
1248
        pvc := &corev1.PersistentVolumeClaim{
1✔
1249
                ObjectMeta: metav1.ObjectMeta{
1✔
1250
                        Name:        name,
1✔
1251
                        Namespace:   ns,
1✔
1252
                        Annotations: annotations,
1✔
1253
                        Labels:      labels,
1✔
1254
                        UID:         types.UID(ns + "-" + name),
1✔
1255
                },
1✔
1256
                Spec: corev1.PersistentVolumeClaimSpec{
1✔
1257
                        AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadOnlyMany, corev1.ReadWriteOnce},
1✔
1258
                        Resources: corev1.VolumeResourceRequirements{
1✔
1259
                                Requests: corev1.ResourceList{
1✔
1260
                                        corev1.ResourceStorage: resource.MustParse("1G"),
1✔
1261
                                },
1✔
1262
                        },
1✔
1263
                        StorageClassName: storageClassName,
1✔
1264
                },
1✔
1265
                Status: corev1.PersistentVolumeClaimStatus{
1✔
1266
                        Phase: phase,
1✔
1267
                },
1✔
1268
        }
1✔
1269
        pvc.Status.Capacity = pvc.Spec.Resources.Requests.DeepCopy()
1✔
1270
        if pvc.Status.Phase == corev1.ClaimBound {
2✔
1271
                pvc.Spec.VolumeName = "pv-" + string(pvc.UID)
1✔
1272
        }
1✔
1273
        return pvc
1✔
1274
}
1275

1276
// GetAPIServerKey returns API server RSA key
1277
func GetAPIServerKey() *rsa.PrivateKey {
×
1278
        apiServerKeyOnce.Do(func() {
×
1279
                apiServerKey, _ = rsa.GenerateKey(rand.Reader, 2048)
×
1280
        })
×
1281
        return apiServerKey
×
1282
}
1283

1284
// CreateStorageClass creates storage class CR
1285
func CreateStorageClass(name string, annotations map[string]string) *storagev1.StorageClass {
1✔
1286
        return &storagev1.StorageClass{
1✔
1287
                ObjectMeta: metav1.ObjectMeta{
1✔
1288
                        Name:        name,
1✔
1289
                        Annotations: annotations,
1✔
1290
                },
1✔
1291
        }
1✔
1292
}
1✔
1293

1294
// CreateImporterTestPod creates importer test pod CR
1295
func CreateImporterTestPod(pvc *corev1.PersistentVolumeClaim, dvname string, scratchPvc *corev1.PersistentVolumeClaim) *corev1.Pod {
×
1296
        // importer pod name contains the pvc name
×
1297
        podName := fmt.Sprintf("%s-%s", common.ImporterPodName, pvc.Name)
×
1298

×
1299
        blockOwnerDeletion := true
×
1300
        isController := true
×
1301

×
1302
        volumes := []corev1.Volume{
×
1303
                {
×
1304
                        Name: dvname,
×
1305
                        VolumeSource: corev1.VolumeSource{
×
1306
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
×
1307
                                        ClaimName: pvc.Name,
×
1308
                                        ReadOnly:  false,
×
1309
                                },
×
1310
                        },
×
1311
                },
×
1312
        }
×
1313

×
1314
        if scratchPvc != nil {
×
1315
                volumes = append(volumes, corev1.Volume{
×
1316
                        Name: ScratchVolName,
×
1317
                        VolumeSource: corev1.VolumeSource{
×
1318
                                PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
×
1319
                                        ClaimName: scratchPvc.Name,
×
1320
                                        ReadOnly:  false,
×
1321
                                },
×
1322
                        },
×
1323
                })
×
1324
        }
×
1325

1326
        pod := &corev1.Pod{
×
1327
                TypeMeta: metav1.TypeMeta{
×
1328
                        Kind:       "Pod",
×
1329
                        APIVersion: "v1",
×
1330
                },
×
1331
                ObjectMeta: metav1.ObjectMeta{
×
1332
                        Name:      podName,
×
1333
                        Namespace: pvc.Namespace,
×
1334
                        Annotations: map[string]string{
×
1335
                                AnnCreatedBy: "yes",
×
1336
                        },
×
1337
                        Labels: map[string]string{
×
1338
                                common.CDILabelKey:        common.CDILabelValue,
×
1339
                                common.CDIComponentLabel:  common.ImporterPodName,
×
1340
                                common.PrometheusLabelKey: common.PrometheusLabelValue,
×
1341
                        },
×
1342
                        OwnerReferences: []metav1.OwnerReference{
×
1343
                                {
×
1344
                                        APIVersion:         "v1",
×
1345
                                        Kind:               "PersistentVolumeClaim",
×
1346
                                        Name:               pvc.Name,
×
1347
                                        UID:                pvc.GetUID(),
×
1348
                                        BlockOwnerDeletion: &blockOwnerDeletion,
×
1349
                                        Controller:         &isController,
×
1350
                                },
×
1351
                        },
×
1352
                },
×
1353
                Spec: corev1.PodSpec{
×
1354
                        Containers: []corev1.Container{
×
1355
                                {
×
1356
                                        Name:            common.ImporterPodName,
×
1357
                                        Image:           "test/myimage",
×
1358
                                        ImagePullPolicy: corev1.PullPolicy("Always"),
×
1359
                                        Args:            []string{"-v=5"},
×
1360
                                        Ports: []corev1.ContainerPort{
×
1361
                                                {
×
1362
                                                        Name:          "metrics",
×
1363
                                                        ContainerPort: 8443,
×
1364
                                                        Protocol:      corev1.ProtocolTCP,
×
1365
                                                },
×
1366
                                        },
×
1367
                                },
×
1368
                        },
×
1369
                        RestartPolicy: corev1.RestartPolicyOnFailure,
×
1370
                        Volumes:       volumes,
×
1371
                },
×
1372
        }
×
1373

×
1374
        ep, _ := GetEndpoint(pvc)
×
1375
        source := GetSource(pvc)
×
1376
        contentType := GetPVCContentType(pvc)
×
1377
        imageSize, _ := GetRequestedImageSize(pvc)
×
1378
        volumeMode := GetVolumeMode(pvc)
×
1379

×
1380
        env := []corev1.EnvVar{
×
1381
                {
×
1382
                        Name:  common.ImporterSource,
×
1383
                        Value: source,
×
1384
                },
×
1385
                {
×
1386
                        Name:  common.ImporterEndpoint,
×
1387
                        Value: ep,
×
1388
                },
×
1389
                {
×
1390
                        Name:  common.ImporterContentType,
×
1391
                        Value: string(contentType),
×
1392
                },
×
1393
                {
×
1394
                        Name:  common.ImporterImageSize,
×
1395
                        Value: imageSize,
×
1396
                },
×
1397
                {
×
1398
                        Name:  common.OwnerUID,
×
1399
                        Value: string(pvc.UID),
×
1400
                },
×
1401
                {
×
1402
                        Name:  common.InsecureTLSVar,
×
1403
                        Value: "false",
×
1404
                },
×
1405
        }
×
1406
        pod.Spec.Containers[0].Env = env
×
1407
        if volumeMode == corev1.PersistentVolumeBlock {
×
1408
                pod.Spec.Containers[0].VolumeDevices = AddVolumeDevices()
×
1409
        } else {
×
1410
                pod.Spec.Containers[0].VolumeMounts = AddImportVolumeMounts()
×
1411
        }
×
1412

1413
        if scratchPvc != nil {
×
1414
                pod.Spec.Containers[0].VolumeMounts = append(pod.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{
×
1415
                        Name:      ScratchVolName,
×
1416
                        MountPath: common.ScratchDataDir,
×
1417
                })
×
1418
        }
×
1419

1420
        return pod
×
1421
}
1422

1423
// CreateStorageClassWithProvisioner creates CR of storage class with provisioner
1424
func CreateStorageClassWithProvisioner(name string, annotations, labels map[string]string, provisioner string) *storagev1.StorageClass {
×
1425
        return &storagev1.StorageClass{
×
1426
                Provisioner: provisioner,
×
1427
                ObjectMeta: metav1.ObjectMeta{
×
1428
                        Name:        name,
×
1429
                        Annotations: annotations,
×
1430
                        Labels:      labels,
×
1431
                },
×
1432
        }
×
1433
}
×
1434

1435
// CreateClient creates a fake client
1436
func CreateClient(objs ...runtime.Object) client.Client {
1✔
1437
        s := scheme.Scheme
1✔
1438
        _ = cdiv1.AddToScheme(s)
1✔
1439
        _ = corev1.AddToScheme(s)
1✔
1440
        _ = storagev1.AddToScheme(s)
1✔
1441
        _ = ocpconfigv1.Install(s)
1✔
1442

1✔
1443
        return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objs...).Build()
1✔
1444
}
1✔
1445

1446
// ErrQuotaExceeded checked is the error is of exceeded quota
1447
func ErrQuotaExceeded(err error) bool {
×
1448
        return strings.Contains(err.Error(), "exceeded quota:")
×
1449
}
×
1450

1451
// GetContentType returns the content type. If invalid or not set, default to kubevirt
1452
func GetContentType(contentType cdiv1.DataVolumeContentType) cdiv1.DataVolumeContentType {
1✔
1453
        switch contentType {
1✔
1454
        case
1455
                cdiv1.DataVolumeKubeVirt,
1456
                cdiv1.DataVolumeArchive:
1✔
1457
        default:
×
1458
                // TODO - shouldn't archive be the default?
×
1459
                contentType = cdiv1.DataVolumeKubeVirt
×
1460
        }
1461
        return contentType
1✔
1462
}
1463

1464
// GetPVCContentType returns the content type of the source image. If invalid or not set, default to kubevirt
1465
func GetPVCContentType(pvc *corev1.PersistentVolumeClaim) cdiv1.DataVolumeContentType {
×
1466
        contentType, found := pvc.Annotations[AnnContentType]
×
1467
        if !found {
×
1468
                // TODO - shouldn't archive be the default?
×
1469
                return cdiv1.DataVolumeKubeVirt
×
1470
        }
×
1471

1472
        return GetContentType(cdiv1.DataVolumeContentType(contentType))
×
1473
}
1474

1475
// GetNamespace returns the given namespace if not empty, otherwise the default namespace
1476
func GetNamespace(namespace, defaultNamespace string) string {
×
1477
        if namespace == "" {
×
1478
                return defaultNamespace
×
1479
        }
×
1480
        return namespace
×
1481
}
1482

1483
// IsErrCacheNotStarted checked is the error is of cache not started
1484
func IsErrCacheNotStarted(err error) bool {
×
1485
        target := &runtimecache.ErrCacheNotStarted{}
×
1486
        return errors.As(err, &target)
×
1487
}
×
1488

1489
// NewImportDataVolume returns new import DataVolume CR
1490
func NewImportDataVolume(name string) *cdiv1.DataVolume {
×
1491
        return &cdiv1.DataVolume{
×
1492
                TypeMeta: metav1.TypeMeta{APIVersion: cdiv1.SchemeGroupVersion.String()},
×
1493
                ObjectMeta: metav1.ObjectMeta{
×
1494
                        Name:      name,
×
1495
                        Namespace: metav1.NamespaceDefault,
×
1496
                        UID:       types.UID(metav1.NamespaceDefault + "-" + name),
×
1497
                },
×
1498
                Spec: cdiv1.DataVolumeSpec{
×
1499
                        Source: &cdiv1.DataVolumeSource{
×
1500
                                HTTP: &cdiv1.DataVolumeSourceHTTP{
×
1501
                                        URL: "http://example.com/data",
×
1502
                                },
×
1503
                        },
×
1504
                        PVC: &corev1.PersistentVolumeClaimSpec{
×
1505
                                AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
×
1506
                        },
×
1507
                        PriorityClassName: "p0",
×
1508
                },
×
1509
        }
×
1510
}
×
1511

1512
// GetCloneSourceInfo returns the type, name and namespace of the cloning source
1513
func GetCloneSourceInfo(dv *cdiv1.DataVolume) (sourceType, sourceName, sourceNamespace string) {
×
1514
        // Cloning sources are mutually exclusive
×
1515
        if dv.Spec.Source.PVC != nil {
×
1516
                return "pvc", dv.Spec.Source.PVC.Name, dv.Spec.Source.PVC.Namespace
×
1517
        }
×
1518

1519
        if dv.Spec.Source.Snapshot != nil {
×
1520
                return "snapshot", dv.Spec.Source.Snapshot.Name, dv.Spec.Source.Snapshot.Namespace
×
1521
        }
×
1522

1523
        return "", "", ""
×
1524
}
1525

1526
// IsWaitForFirstConsumerEnabled tells us if we should respect "real" WFFC behavior or just let our worker pods randomly spawn
1527
func IsWaitForFirstConsumerEnabled(obj metav1.Object, gates featuregates.FeatureGates) (bool, error) {
×
1528
        // when PVC requests immediateBinding it cannot honor wffc logic
×
1529
        isImmediateBindingRequested := ImmediateBindingRequested(obj)
×
1530
        pvcHonorWaitForFirstConsumer := !isImmediateBindingRequested
×
1531
        globalHonorWaitForFirstConsumer, err := gates.HonorWaitForFirstConsumerEnabled()
×
1532
        if err != nil {
×
1533
                return false, err
×
1534
        }
×
1535

1536
        return pvcHonorWaitForFirstConsumer && globalHonorWaitForFirstConsumer, nil
×
1537
}
1538

1539
// AddImmediateBindingAnnotationIfWFFCDisabled adds the immediateBinding annotation if wffc feature gate is disabled
1540
func AddImmediateBindingAnnotationIfWFFCDisabled(obj metav1.Object, gates featuregates.FeatureGates) error {
×
1541
        globalHonorWaitForFirstConsumer, err := gates.HonorWaitForFirstConsumerEnabled()
×
1542
        if err != nil {
×
1543
                return err
×
1544
        }
×
1545
        if !globalHonorWaitForFirstConsumer {
×
1546
                AddAnnotation(obj, AnnImmediateBinding, "")
×
1547
        }
×
1548
        return nil
×
1549
}
1550

1551
// InflateSizeWithOverhead inflates a storage size with proper overhead calculations
1552
func InflateSizeWithOverhead(ctx context.Context, c client.Client, imgSize int64, pvcSpec *corev1.PersistentVolumeClaimSpec) (resource.Quantity, error) {
×
1553
        var returnSize resource.Quantity
×
1554

×
1555
        if util.ResolveVolumeMode(pvcSpec.VolumeMode) == corev1.PersistentVolumeFilesystem {
×
1556
                fsOverhead, err := GetFilesystemOverheadForStorageClass(ctx, c, pvcSpec.StorageClassName)
×
1557
                if err != nil {
×
1558
                        return resource.Quantity{}, err
×
1559
                }
×
1560
                // Parse filesystem overhead (percentage) into a 64-bit float
1561
                fsOverheadFloat, _ := strconv.ParseFloat(string(fsOverhead), 64)
×
1562

×
1563
                // Merge the previous values into a 'resource.Quantity' struct
×
1564
                requiredSpace := util.GetRequiredSpace(fsOverheadFloat, imgSize)
×
1565
                returnSize = *resource.NewScaledQuantity(requiredSpace, 0)
×
1566
        } else {
×
1567
                // Inflation is not needed with 'Block' mode
×
1568
                returnSize = *resource.NewScaledQuantity(imgSize, 0)
×
1569
        }
×
1570

1571
        return returnSize, nil
×
1572
}
1573

1574
// IsBound returns if the pvc is bound
1575
func IsBound(pvc *corev1.PersistentVolumeClaim) bool {
×
1576
        return pvc != nil && pvc.Status.Phase == corev1.ClaimBound
×
1577
}
×
1578

1579
// IsUnbound returns if the pvc is not bound yet
1580
func IsUnbound(pvc *corev1.PersistentVolumeClaim) bool {
×
1581
        return !IsBound(pvc)
×
1582
}
×
1583

1584
// IsLost returns if the pvc is lost
1585
func IsLost(pvc *corev1.PersistentVolumeClaim) bool {
×
1586
        return pvc != nil && pvc.Status.Phase == corev1.ClaimLost
×
1587
}
×
1588

1589
// IsImageStream returns true if registry source is ImageStream
1590
func IsImageStream(pvc *corev1.PersistentVolumeClaim) bool {
×
1591
        return pvc.Annotations[AnnRegistryImageStream] == "true"
×
1592
}
×
1593

1594
// ShouldIgnorePod checks if a pod should be ignored.
1595
// If this is a completed pod that was used for one checkpoint of a multi-stage import, it
1596
// should be ignored by pod lookups as long as the retainAfterCompletion annotation is set.
1597
func ShouldIgnorePod(pod *corev1.Pod, pvc *corev1.PersistentVolumeClaim) bool {
×
1598
        retain := pvc.ObjectMeta.Annotations[AnnPodRetainAfterCompletion]
×
1599
        checkpoint := pvc.ObjectMeta.Annotations[AnnCurrentCheckpoint]
×
1600
        if checkpoint != "" && pod.Status.Phase == corev1.PodSucceeded {
×
1601
                return retain == "true"
×
1602
        }
×
1603
        return false
×
1604
}
1605

1606
// BuildHTTPClient generates an http client that accepts any certificate, since we are using
1607
// it to get prometheus data it doesn't matter if someone can intercept the data. Once we have
1608
// a mechanism to properly sign the server, we can update this method to get a proper client.
1609
func BuildHTTPClient(httpClient *http.Client) *http.Client {
×
1610
        if httpClient == nil {
×
1611
                defaultTransport := http.DefaultTransport.(*http.Transport)
×
1612
                // Create new Transport that ignores self-signed SSL
×
1613
                //nolint:gosec
×
1614
                tr := &http.Transport{
×
1615
                        Proxy:                 defaultTransport.Proxy,
×
1616
                        DialContext:           defaultTransport.DialContext,
×
1617
                        MaxIdleConns:          defaultTransport.MaxIdleConns,
×
1618
                        IdleConnTimeout:       defaultTransport.IdleConnTimeout,
×
1619
                        ExpectContinueTimeout: defaultTransport.ExpectContinueTimeout,
×
1620
                        TLSHandshakeTimeout:   defaultTransport.TLSHandshakeTimeout,
×
1621
                        TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},
×
1622
                }
×
1623
                httpClient = &http.Client{
×
1624
                        Transport: tr,
×
1625
                }
×
1626
        }
×
1627
        return httpClient
×
1628
}
1629

1630
// ErrConnectionRefused checks for connection refused errors
1631
func ErrConnectionRefused(err error) bool {
×
1632
        return strings.Contains(err.Error(), "connection refused")
×
1633
}
×
1634

1635
// GetPodMetricsPort returns, if exists, the metrics port from the passed pod
1636
func GetPodMetricsPort(pod *corev1.Pod) (int, error) {
1✔
1637
        for _, container := range pod.Spec.Containers {
2✔
1638
                for _, port := range container.Ports {
2✔
1639
                        if port.Name == "metrics" {
2✔
1640
                                return int(port.ContainerPort), nil
1✔
1641
                        }
1✔
1642
                }
1643
        }
1644
        return 0, errors.New("Metrics port not found in pod")
1✔
1645
}
1646

1647
// GetMetricsURL builds the metrics URL according to the specified pod
1648
func GetMetricsURL(pod *corev1.Pod) (string, error) {
1✔
1649
        if pod == nil {
1✔
1650
                return "", nil
×
1651
        }
×
1652
        port, err := GetPodMetricsPort(pod)
1✔
1653
        if err != nil || pod.Status.PodIP == "" {
2✔
1654
                return "", err
1✔
1655
        }
1✔
1656
        domain := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(port))
1✔
1657
        url := fmt.Sprintf("https://%s/metrics", domain)
1✔
1658
        return url, nil
1✔
1659
}
1660

1661
// GetProgressReportFromURL fetches the progress report from the passed URL according to an specific metric expression and ownerUID
1662
func GetProgressReportFromURL(ctx context.Context, url string, httpClient *http.Client, metricExp, ownerUID string) (string, error) {
×
1663
        regExp := regexp.MustCompile(fmt.Sprintf("(%s)\\{ownerUID\\=%q\\} (\\d{1,3}\\.?\\d*)", metricExp, ownerUID))
×
1664
        // pod could be gone, don't block an entire thread for 30 seconds
×
1665
        // just to get back an i/o timeout
×
1666
        ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
×
1667
        defer cancel()
×
1668
        req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
×
1669
        if err != nil {
×
1670
                return "", err
×
1671
        }
×
1672
        resp, err := httpClient.Do(req)
×
1673
        if err != nil {
×
1674
                if ErrConnectionRefused(err) {
×
1675
                        return "", nil
×
1676
                }
×
1677
                return "", err
×
1678
        }
1679
        defer resp.Body.Close()
×
1680
        body, err := io.ReadAll(resp.Body)
×
1681
        if err != nil {
×
1682
                return "", err
×
1683
        }
×
1684

1685
        // Parse the progress from the body
1686
        progressReport := ""
×
1687
        match := regExp.FindStringSubmatch(string(body))
×
1688
        if match != nil {
×
1689
                progressReport = match[len(match)-1]
×
1690
        }
×
1691
        return progressReport, nil
×
1692
}
1693

1694
// UpdateHTTPAnnotations updates the passed annotations for proper http import
1695
func UpdateHTTPAnnotations(annotations map[string]string, http *cdiv1.DataVolumeSourceHTTP) {
×
1696
        annotations[AnnEndpoint] = http.URL
×
1697
        annotations[AnnSource] = SourceHTTP
×
1698

×
1699
        if http.SecretRef != "" {
×
1700
                annotations[AnnSecret] = http.SecretRef
×
1701
        }
×
1702
        if http.CertConfigMap != "" {
×
1703
                annotations[AnnCertConfigMap] = http.CertConfigMap
×
1704
        }
×
1705
        for index, header := range http.ExtraHeaders {
×
1706
                annotations[fmt.Sprintf("%s.%d", AnnExtraHeaders, index)] = header
×
1707
        }
×
1708
        for index, header := range http.SecretExtraHeaders {
×
1709
                annotations[fmt.Sprintf("%s.%d", AnnSecretExtraHeaders, index)] = header
×
1710
        }
×
1711
}
1712

1713
// UpdateS3Annotations updates the passed annotations for proper S3 import
1714
func UpdateS3Annotations(annotations map[string]string, s3 *cdiv1.DataVolumeSourceS3) {
×
1715
        annotations[AnnEndpoint] = s3.URL
×
1716
        annotations[AnnSource] = SourceS3
×
1717
        if s3.SecretRef != "" {
×
1718
                annotations[AnnSecret] = s3.SecretRef
×
1719
        }
×
1720
        if s3.CertConfigMap != "" {
×
1721
                annotations[AnnCertConfigMap] = s3.CertConfigMap
×
1722
        }
×
1723
}
1724

1725
// UpdateGCSAnnotations updates the passed annotations for proper GCS import
1726
func UpdateGCSAnnotations(annotations map[string]string, gcs *cdiv1.DataVolumeSourceGCS) {
×
1727
        annotations[AnnEndpoint] = gcs.URL
×
1728
        annotations[AnnSource] = SourceGCS
×
1729
        if gcs.SecretRef != "" {
×
1730
                annotations[AnnSecret] = gcs.SecretRef
×
1731
        }
×
1732
}
1733

1734
// UpdateRegistryAnnotations updates the passed annotations for proper registry import
1735
func UpdateRegistryAnnotations(annotations map[string]string, registry *cdiv1.DataVolumeSourceRegistry) {
×
1736
        annotations[AnnSource] = SourceRegistry
×
1737
        pullMethod := registry.PullMethod
×
1738
        if pullMethod != nil && *pullMethod != "" {
×
1739
                annotations[AnnRegistryImportMethod] = string(*pullMethod)
×
1740
        }
×
1741
        url := registry.URL
×
1742
        if url != nil && *url != "" {
×
1743
                annotations[AnnEndpoint] = *url
×
1744
        } else {
×
1745
                imageStream := registry.ImageStream
×
1746
                if imageStream != nil && *imageStream != "" {
×
1747
                        annotations[AnnEndpoint] = *imageStream
×
1748
                        annotations[AnnRegistryImageStream] = "true"
×
1749
                }
×
1750
        }
1751
        secretRef := registry.SecretRef
×
1752
        if secretRef != nil && *secretRef != "" {
×
1753
                annotations[AnnSecret] = *secretRef
×
1754
        }
×
1755
        certConfigMap := registry.CertConfigMap
×
1756
        if certConfigMap != nil && *certConfigMap != "" {
×
1757
                annotations[AnnCertConfigMap] = *certConfigMap
×
1758
        }
×
1759

1760
        if registry.Platform != nil && registry.Platform.Architecture != "" {
×
1761
                annotations[AnnRegistryImageArchitecture] = registry.Platform.Architecture
×
1762
        }
×
1763
}
1764

1765
// UpdateVDDKAnnotations updates the passed annotations for proper VDDK import
1766
func UpdateVDDKAnnotations(annotations map[string]string, vddk *cdiv1.DataVolumeSourceVDDK) {
×
1767
        annotations[AnnEndpoint] = vddk.URL
×
1768
        annotations[AnnSource] = SourceVDDK
×
1769
        annotations[AnnSecret] = vddk.SecretRef
×
1770
        annotations[AnnBackingFile] = vddk.BackingFile
×
1771
        annotations[AnnUUID] = vddk.UUID
×
1772
        annotations[AnnThumbprint] = vddk.Thumbprint
×
1773
        if vddk.InitImageURL != "" {
×
1774
                annotations[AnnVddkInitImageURL] = vddk.InitImageURL
×
1775
        }
×
1776
        if vddk.ExtraArgs != "" {
×
1777
                annotations[AnnVddkExtraArgs] = vddk.ExtraArgs
×
1778
        }
×
1779
}
1780

1781
// UpdateImageIOAnnotations updates the passed annotations for proper imageIO import
1782
func UpdateImageIOAnnotations(annotations map[string]string, imageio *cdiv1.DataVolumeSourceImageIO) {
×
1783
        annotations[AnnEndpoint] = imageio.URL
×
1784
        annotations[AnnSource] = SourceImageio
×
1785
        annotations[AnnSecret] = imageio.SecretRef
×
1786
        annotations[AnnCertConfigMap] = imageio.CertConfigMap
×
1787
        annotations[AnnDiskID] = imageio.DiskID
×
1788
        if imageio.InsecureSkipVerify != nil && *imageio.InsecureSkipVerify {
×
1789
                annotations[AnnInsecureSkipVerify] = "true"
×
1790
        }
×
1791
}
1792

1793
// IsPVBoundToPVC checks if a PV is bound to a specific PVC
1794
func IsPVBoundToPVC(pv *corev1.PersistentVolume, pvc *corev1.PersistentVolumeClaim) bool {
1✔
1795
        claimRef := pv.Spec.ClaimRef
1✔
1796
        return claimRef != nil && claimRef.Name == pvc.Name && claimRef.Namespace == pvc.Namespace && claimRef.UID == pvc.UID
1✔
1797
}
1✔
1798

1799
// Rebind binds the PV of source to target
1800
func Rebind(ctx context.Context, c client.Client, source, target *corev1.PersistentVolumeClaim) error {
1✔
1801
        pv := &corev1.PersistentVolume{
1✔
1802
                ObjectMeta: metav1.ObjectMeta{
1✔
1803
                        Name: source.Spec.VolumeName,
1✔
1804
                },
1✔
1805
        }
1✔
1806

1✔
1807
        if err := c.Get(ctx, client.ObjectKeyFromObject(pv), pv); err != nil {
2✔
1808
                return err
1✔
1809
        }
1✔
1810

1811
        // Examine the claimref for the PV and see if it's still bound to PVC'
1812
        if pv.Spec.ClaimRef == nil {
1✔
1813
                return fmt.Errorf("PV %s claimRef is nil", pv.Name)
×
1814
        }
×
1815

1816
        if !IsPVBoundToPVC(pv, source) {
2✔
1817
                // Something is not right if the PV is neither bound to PVC' nor target PVC
1✔
1818
                if !IsPVBoundToPVC(pv, target) {
2✔
1819
                        klog.Errorf("PV bound to unexpected PVC: Could not rebind to target PVC '%s'", target.Name)
1✔
1820
                        return fmt.Errorf("PV %s bound to unexpected claim %s", pv.Name, pv.Spec.ClaimRef.Name)
1✔
1821
                }
1✔
1822
                // our work is done
1823
                return nil
1✔
1824
        }
1825

1826
        // Rebind PVC to target PVC
1827
        pv.Spec.ClaimRef = &corev1.ObjectReference{
1✔
1828
                Namespace:       target.Namespace,
1✔
1829
                Name:            target.Name,
1✔
1830
                UID:             target.UID,
1✔
1831
                ResourceVersion: target.ResourceVersion,
1✔
1832
        }
1✔
1833
        klog.V(3).Info("Rebinding PV to target PVC", "PVC", target.Name)
1✔
1834
        if err := c.Update(context.TODO(), pv); err != nil {
1✔
1835
                return err
×
1836
        }
×
1837

1838
        return nil
1✔
1839
}
1840

1841
// BulkDeleteResources deletes a bunch of resources
1842
func BulkDeleteResources(ctx context.Context, c client.Client, obj client.ObjectList, lo client.ListOption) error {
×
1843
        if err := c.List(ctx, obj, lo); err != nil {
×
1844
                if meta.IsNoMatchError(err) {
×
1845
                        return nil
×
1846
                }
×
1847
                return err
×
1848
        }
1849

1850
        sv := reflect.ValueOf(obj).Elem()
×
1851
        iv := sv.FieldByName("Items")
×
1852

×
1853
        for i := 0; i < iv.Len(); i++ {
×
1854
                obj := iv.Index(i).Addr().Interface().(client.Object)
×
1855
                if obj.GetDeletionTimestamp().IsZero() {
×
1856
                        klog.V(3).Infof("Deleting type %+v %+v", reflect.TypeOf(obj), obj)
×
1857
                        if err := c.Delete(ctx, obj); err != nil {
×
1858
                                return err
×
1859
                        }
×
1860
                }
1861
        }
1862

1863
        return nil
×
1864
}
1865

1866
// ValidateSnapshotCloneSize does proper size validation when doing a clone from snapshot operation
1867
func ValidateSnapshotCloneSize(snapshot *snapshotv1.VolumeSnapshot, pvcSpec *corev1.PersistentVolumeClaimSpec, targetSC *storagev1.StorageClass, log logr.Logger) (bool, error) {
×
1868
        restoreSize := snapshot.Status.RestoreSize
×
1869
        if restoreSize == nil {
×
1870
                return false, fmt.Errorf("snapshot has no RestoreSize")
×
1871
        }
×
1872
        targetRequest, hasTargetRequest := pvcSpec.Resources.Requests[corev1.ResourceStorage]
×
1873
        allowExpansion := targetSC.AllowVolumeExpansion != nil && *targetSC.AllowVolumeExpansion
×
1874
        if hasTargetRequest {
×
1875
                // otherwise will just use restoreSize
×
1876
                if restoreSize.Cmp(targetRequest) < 0 && !allowExpansion {
×
1877
                        log.V(3).Info("Can't expand restored PVC because SC does not allow expansion, need to fall back to host assisted")
×
1878
                        return false, nil
×
1879
                }
×
1880
        }
1881
        return true, nil
×
1882
}
1883

1884
// ValidateSnapshotCloneProvisioners validates the target PVC storage class against the snapshot class provisioner
1885
func ValidateSnapshotCloneProvisioners(vsc *snapshotv1.VolumeSnapshotContent, storageClass *storagev1.StorageClass) (bool, error) {
×
1886
        // Do snapshot and storage class validation
×
1887
        if storageClass == nil {
×
1888
                return false, fmt.Errorf("target storage class not found")
×
1889
        }
×
1890
        if storageClass.Provisioner != vsc.Spec.Driver {
×
1891
                return false, nil
×
1892
        }
×
1893
        // TODO: get sourceVolumeMode from volumesnapshotcontent and validate against target spec
1894
        // currently don't have CRDs in CI with sourceVolumeMode which is pretty new
1895
        // converting volume mode is possible but has security implications
1896
        return true, nil
×
1897
}
1898

1899
// GetSnapshotClassForSmartClone looks up the snapshot class based on the storage class
1900
func GetSnapshotClassForSmartClone(pvc *corev1.PersistentVolumeClaim, targetPvcStorageClassName, snapshotClassName *string, log logr.Logger, client client.Client, recorder record.EventRecorder) (string, error) {
×
1901
        logger := log.WithName("GetSnapshotClassForSmartClone").V(3)
×
1902
        // Check if relevant CRDs are available
×
1903
        if !isCsiCrdsDeployed(client, log) {
×
1904
                logger.Info("Missing CSI snapshotter CRDs, falling back to host assisted clone")
×
1905
                return "", nil
×
1906
        }
×
1907

1908
        targetStorageClass, err := GetStorageClassByNameWithK8sFallback(context.TODO(), client, targetPvcStorageClassName)
×
1909
        if err != nil {
×
1910
                return "", err
×
1911
        }
×
1912
        if targetStorageClass == nil {
×
1913
                logger.Info("Target PVC's Storage Class not found")
×
1914
                return "", nil
×
1915
        }
×
1916

1917
        vscName, err := GetVolumeSnapshotClass(context.TODO(), client, pvc, targetStorageClass.Provisioner, snapshotClassName, logger, recorder)
×
1918
        if err != nil {
×
1919
                return "", err
×
1920
        }
×
1921
        if vscName != nil {
×
1922
                if pvc != nil {
×
1923
                        logger.Info("smart-clone is applicable for datavolume", "datavolume",
×
1924
                                pvc.Name, "snapshot class", *vscName)
×
1925
                }
×
1926
                return *vscName, nil
×
1927
        }
1928

1929
        logger.Info("Could not match snapshotter with storage class, falling back to host assisted clone")
×
1930
        return "", nil
×
1931
}
1932

1933
// GetVolumeSnapshotClass looks up the snapshot class based on the driver and an optional specific name
1934
// In case of multiple candidates, it returns the default-annotated one, or the sorted list first one if no such default
1935
func GetVolumeSnapshotClass(ctx context.Context, c client.Client, pvc *corev1.PersistentVolumeClaim, driver string, snapshotClassName *string, log logr.Logger, recorder record.EventRecorder) (*string, error) {
×
1936
        logger := log.WithName("GetVolumeSnapshotClass").V(3)
×
1937

×
1938
        logEvent := func(message, vscName string) {
×
1939
                logger.Info(message, "name", vscName)
×
1940
                if pvc != nil {
×
1941
                        msg := fmt.Sprintf("%s %s", message, vscName)
×
1942
                        recorder.Event(pvc, corev1.EventTypeNormal, VolumeSnapshotClassSelected, msg)
×
1943
                }
×
1944
        }
1945

1946
        if snapshotClassName != nil {
×
1947
                vsc := &snapshotv1.VolumeSnapshotClass{}
×
1948
                if err := c.Get(context.TODO(), types.NamespacedName{Name: *snapshotClassName}, vsc); err != nil {
×
1949
                        return nil, err
×
1950
                }
×
1951
                if vsc.Driver == driver {
×
1952
                        logEvent(MessageStorageProfileVolumeSnapshotClassSelected, vsc.Name)
×
1953
                        return snapshotClassName, nil
×
1954
                }
×
1955
                return nil, nil
×
1956
        }
1957

1958
        vscList := &snapshotv1.VolumeSnapshotClassList{}
×
1959
        if err := c.List(ctx, vscList); err != nil {
×
1960
                if meta.IsNoMatchError(err) {
×
1961
                        return nil, nil
×
1962
                }
×
1963
                return nil, err
×
1964
        }
1965

1966
        var candidates []string
×
1967
        for _, vsc := range vscList.Items {
×
1968
                if vsc.Driver == driver {
×
1969
                        if vsc.Annotations[AnnDefaultSnapshotClass] == "true" {
×
1970
                                logEvent(MessageDefaultVolumeSnapshotClassSelected, vsc.Name)
×
1971
                                vscName := vsc.Name
×
1972
                                return &vscName, nil
×
1973
                        }
×
1974
                        candidates = append(candidates, vsc.Name)
×
1975
                }
1976
        }
1977

1978
        if len(candidates) > 0 {
×
1979
                sort.Strings(candidates)
×
1980
                logEvent(MessageFirstVolumeSnapshotClassSelected, candidates[0])
×
1981
                return &candidates[0], nil
×
1982
        }
×
1983

1984
        return nil, nil
×
1985
}
1986

1987
// isCsiCrdsDeployed checks whether the CSI snapshotter CRD are deployed
1988
func isCsiCrdsDeployed(c client.Client, log logr.Logger) bool {
×
1989
        version := "v1"
×
1990
        vsClass := "volumesnapshotclasses." + snapshotv1.GroupName
×
1991
        vsContent := "volumesnapshotcontents." + snapshotv1.GroupName
×
1992
        vs := "volumesnapshots." + snapshotv1.GroupName
×
1993

×
1994
        return isCrdDeployed(c, vsClass, version, log) &&
×
1995
                isCrdDeployed(c, vsContent, version, log) &&
×
1996
                isCrdDeployed(c, vs, version, log)
×
1997
}
×
1998

1999
// isCrdDeployed checks whether a CRD is deployed
2000
func isCrdDeployed(c client.Client, name, version string, log logr.Logger) bool {
×
2001
        crd := &extv1.CustomResourceDefinition{}
×
2002
        err := c.Get(context.TODO(), types.NamespacedName{Name: name}, crd)
×
2003
        if err != nil {
×
2004
                if !k8serrors.IsNotFound(err) {
×
2005
                        log.Info("Error looking up CRD", "crd name", name, "version", version, "error", err)
×
2006
                }
×
2007
                return false
×
2008
        }
2009

2010
        for _, v := range crd.Spec.Versions {
×
2011
                if v.Name == version && v.Served {
×
2012
                        return true
×
2013
                }
×
2014
        }
2015

2016
        return false
×
2017
}
2018

2019
// IsSnapshotReady indicates if a volume snapshot is ready to be used
2020
func IsSnapshotReady(snapshot *snapshotv1.VolumeSnapshot) bool {
×
2021
        return snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse
×
2022
}
×
2023

2024
// GetResource updates given obj with the data of the object with the same name and namespace
2025
func GetResource(ctx context.Context, c client.Client, namespace, name string, obj client.Object) (bool, error) {
×
2026
        obj.SetNamespace(namespace)
×
2027
        obj.SetName(name)
×
2028

×
2029
        err := c.Get(ctx, client.ObjectKeyFromObject(obj), obj)
×
2030
        if err != nil {
×
2031
                if k8serrors.IsNotFound(err) {
×
2032
                        return false, nil
×
2033
                }
×
2034

2035
                return false, err
×
2036
        }
2037

2038
        return true, nil
×
2039
}
2040

2041
// PatchArgs are the args for Patch
2042
type PatchArgs struct {
2043
        Client client.Client
2044
        Log    logr.Logger
2045
        Obj    client.Object
2046
        OldObj client.Object
2047
}
2048

2049
// GetAnnotatedEventSource returns resource referenced by AnnEventSource annotations
2050
func GetAnnotatedEventSource(ctx context.Context, c client.Client, obj client.Object) (client.Object, error) {
×
2051
        esk, ok := obj.GetAnnotations()[AnnEventSourceKind]
×
2052
        if !ok {
×
2053
                return obj, nil
×
2054
        }
×
2055
        if esk != "PersistentVolumeClaim" {
×
2056
                return obj, nil
×
2057
        }
×
2058
        es, ok := obj.GetAnnotations()[AnnEventSource]
×
2059
        if !ok {
×
2060
                return obj, nil
×
2061
        }
×
2062
        namespace, name, err := cache.SplitMetaNamespaceKey(es)
×
2063
        if err != nil {
×
2064
                return nil, err
×
2065
        }
×
2066
        pvc := &corev1.PersistentVolumeClaim{
×
2067
                ObjectMeta: metav1.ObjectMeta{
×
2068
                        Namespace: namespace,
×
2069
                        Name:      name,
×
2070
                },
×
2071
        }
×
2072
        if err := c.Get(ctx, client.ObjectKeyFromObject(pvc), pvc); err != nil {
×
2073
                return nil, err
×
2074
        }
×
2075
        return pvc, nil
×
2076
}
2077

2078
// OwnedByDataVolume returns true if the object is owned by a DataVolume
2079
func OwnedByDataVolume(obj metav1.Object) bool {
×
2080
        owner := metav1.GetControllerOf(obj)
×
2081
        return owner != nil && owner.Kind == "DataVolume"
×
2082
}
×
2083

2084
// CopyAllowedAnnotations copies the allowed annotations from the source object
2085
// to the destination object
2086
func CopyAllowedAnnotations(srcObj, dstObj metav1.Object) {
×
2087
        for ann, def := range allowedAnnotations {
×
2088
                val, ok := srcObj.GetAnnotations()[ann]
×
2089
                if !ok && def != "" {
×
2090
                        val = def
×
2091
                }
×
2092
                if val != "" {
×
2093
                        klog.V(1).Info("Applying annotation", "Name", dstObj.GetName(), ann, val)
×
2094
                        AddAnnotation(dstObj, ann, val)
×
2095
                }
×
2096
        }
2097
}
2098

2099
// CopyAllowedLabels copies allowed labels matching the validLabelsMatch regexp from the
2100
// source map to the destination object allowing overwrites
2101
func CopyAllowedLabels(srcLabels map[string]string, dstObj metav1.Object, overwrite bool) {
1✔
2102
        for label, value := range srcLabels {
2✔
2103
                if _, found := dstObj.GetLabels()[label]; (!found || overwrite) && validLabelsMatch.MatchString(label) {
2✔
2104
                        AddLabel(dstObj, label, value)
1✔
2105
                }
1✔
2106
        }
2107
}
2108

2109
// ClaimMayExistBeforeDataVolume returns true if the PVC may exist before the DataVolume
2110
func ClaimMayExistBeforeDataVolume(c client.Client, pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
×
2111
        if ClaimIsPopulatedForDataVolume(pvc, dv) {
×
2112
                return true, nil
×
2113
        }
×
2114
        return AllowClaimAdoption(c, pvc, dv)
×
2115
}
2116

2117
// ClaimIsPopulatedForDataVolume returns true if the PVC is populated for the given DataVolume
2118
func ClaimIsPopulatedForDataVolume(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) bool {
×
2119
        return pvc != nil && dv != nil && pvc.Annotations[AnnPopulatedFor] == dv.Name
×
2120
}
×
2121

2122
// AllowClaimAdoption returns true if the PVC may be adopted
2123
func AllowClaimAdoption(c client.Client, pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume) (bool, error) {
×
2124
        if pvc == nil || dv == nil {
×
2125
                return false, nil
×
2126
        }
×
2127
        anno, ok := pvc.Annotations[AnnCreatedForDataVolume]
×
2128
        if ok && anno == string(dv.UID) {
×
2129
                return false, nil
×
2130
        }
×
2131
        anno, ok = dv.Annotations[AnnAllowClaimAdoption]
×
2132
        // if annotation exists, go with that regardless of featuregate
×
2133
        if ok {
×
2134
                val, _ := strconv.ParseBool(anno)
×
2135
                return val, nil
×
2136
        }
×
2137
        return featuregates.NewFeatureGates(c).ClaimAdoptionEnabled()
×
2138
}
2139

2140
// ResolveDataSourceChain resolves a DataSource reference.
2141
// Returns an error if DataSource reference is not found or
2142
// DataSource reference points to another DataSource
2143
func ResolveDataSourceChain(ctx context.Context, client client.Client, dataSource *cdiv1.DataSource) (*cdiv1.DataSource, error) {
×
2144
        if dataSource.Spec.Source.DataSource == nil {
×
2145
                return dataSource, nil
×
2146
        }
×
2147

2148
        ref := dataSource.Spec.Source.DataSource
×
2149
        refNs := GetNamespace(ref.Namespace, dataSource.Namespace)
×
2150
        if dataSource.Namespace != refNs {
×
2151
                return dataSource, ErrDataSourceCrossNamespace
×
2152
        }
×
2153
        if ref.Name == dataSource.Name && refNs == dataSource.Namespace {
×
2154
                return nil, ErrDataSourceSelfReference
×
2155
        }
×
2156

2157
        resolved := &cdiv1.DataSource{}
×
2158
        if err := client.Get(ctx, types.NamespacedName{Name: ref.Name, Namespace: refNs}, resolved); err != nil {
×
2159
                return nil, err
×
2160
        }
×
2161

2162
        if resolved.Spec.Source.DataSource != nil {
×
2163
                return nil, ErrDataSourceMaxDepthReached
×
2164
        }
×
2165

2166
        return resolved, nil
×
2167
}
2168

2169
func sortEvents(events *corev1.EventList, usingPopulator bool, pvcPrimeName string) {
1✔
2170
        // Sort event lists by containing primeName substring and most recent timestamp
1✔
2171
        sort.Slice(events.Items, func(i, j int) bool {
2✔
2172
                if usingPopulator {
2✔
2173
                        firstContainsPrime := strings.Contains(events.Items[i].Message, pvcPrimeName)
1✔
2174
                        secondContainsPrime := strings.Contains(events.Items[j].Message, pvcPrimeName)
1✔
2175

1✔
2176
                        if firstContainsPrime && !secondContainsPrime {
2✔
2177
                                return true
1✔
2178
                        }
1✔
2179
                        if !firstContainsPrime && secondContainsPrime {
2✔
2180
                                return false
1✔
2181
                        }
1✔
2182
                }
2183

2184
                // if the timestamps are the same, prioritze longer messages to make sure our sorting is deterministic
2185
                if events.Items[i].LastTimestamp.Time.Equal(events.Items[j].LastTimestamp.Time) {
1✔
2186
                        return len(events.Items[i].Message) > len(events.Items[j].Message)
×
2187
                }
×
2188

2189
                // if both contains primeName substring or neither, just sort on timestamp
2190
                return events.Items[i].LastTimestamp.Time.After(events.Items[j].LastTimestamp.Time)
1✔
2191
        })
2192
}
2193

2194
// UpdatePVCBoundContionFromEvents updates the bound condition annotations on the PVC based on recent events
2195
// This function can be used by both controller and populator packages to update PVC bound condition information
2196
func UpdatePVCBoundContionFromEvents(pvc *corev1.PersistentVolumeClaim, c client.Client, log logr.Logger) error {
×
2197
        currentPvcCopy := pvc.DeepCopy()
×
2198

×
2199
        anno := pvc.GetAnnotations()
×
2200
        if anno == nil {
×
2201
                return nil
×
2202
        }
×
2203

2204
        if IsBound(pvc) {
×
2205
                anno := pvc.GetAnnotations()
×
2206
                delete(anno, AnnBoundCondition)
×
2207
                delete(anno, AnnBoundConditionReason)
×
2208
                delete(anno, AnnBoundConditionMessage)
×
2209

×
2210
                if !reflect.DeepEqual(currentPvcCopy, pvc) {
×
2211
                        patch := client.MergeFrom(currentPvcCopy)
×
2212
                        if err := c.Patch(context.TODO(), pvc, patch); err != nil {
×
2213
                                return err
×
2214
                        }
×
2215
                }
2216

2217
                return nil
×
2218
        }
2219

2220
        if pvc.Status.Phase != corev1.ClaimPending {
×
2221
                return nil
×
2222
        }
×
2223

2224
        // set bound condition by getting the latest event
2225
        events := &corev1.EventList{}
×
2226

×
2227
        err := c.List(context.TODO(), events,
×
2228
                client.InNamespace(pvc.GetNamespace()),
×
2229
                client.MatchingFields{"involvedObject.name": pvc.GetName(),
×
2230
                        "involvedObject.uid": string(pvc.GetUID())},
×
2231
        )
×
2232

×
2233
        if err != nil {
×
2234
                // Log the error but don't fail the reconciliation
×
2235
                log.Error(err, "Unable to list events for PVC bound condition update", "pvc", pvc.Name)
×
2236
                return nil
×
2237
        }
×
2238

2239
        if len(events.Items) == 0 {
×
2240
                return nil
×
2241
        }
×
2242

2243
        pvcPrime, usingPopulator := anno[AnnPVCPrimeName]
×
2244

×
2245
        // Sort event lists by containing primeName substring and most recent timestamp
×
2246
        sortEvents(events, usingPopulator, pvcPrime)
×
2247

×
2248
        boundMessage := ""
×
2249
        // check if prime name annotation exists
×
2250
        if usingPopulator {
×
2251
                // if we are using populators get the latest event from prime pvc
×
2252
                pvcPrime = fmt.Sprintf("[%s] : ", pvcPrime)
×
2253

×
2254
                // if the first event does not contain a prime message, none will so return
×
2255
                primeIdx := strings.Index(events.Items[0].Message, pvcPrime)
×
2256
                if primeIdx == -1 {
×
2257
                        log.V(1).Info("No bound message found, skipping bound condition update", "pvc", pvc.Name)
×
2258
                        return nil
×
2259
                }
×
2260
                boundMessage = events.Items[0].Message[primeIdx+len(pvcPrime):]
×
2261
        } else {
×
2262
                // if not using populators just get the latest event
×
2263
                boundMessage = events.Items[0].Message
×
2264
        }
×
2265

2266
        // since we checked status of phase above, we know this is pending
2267
        anno[AnnBoundCondition] = "false"
×
2268
        anno[AnnBoundConditionReason] = "Pending"
×
2269
        anno[AnnBoundConditionMessage] = boundMessage
×
2270

×
2271
        patch := client.MergeFrom(currentPvcCopy)
×
2272
        if err := c.Patch(context.TODO(), pvc, patch); err != nil {
×
2273
                return err
×
2274
        }
×
2275

2276
        return nil
×
2277
}
2278

2279
// CopyEvents gets srcPvc events and re-emits them on the target PVC with the src name prefix
2280
func CopyEvents(srcPVC, targetPVC client.Object, c client.Client, recorder record.EventRecorder) {
×
2281
        srcPrefixMsg := fmt.Sprintf("[%s] : ", srcPVC.GetName())
×
2282

×
2283
        newEvents := &corev1.EventList{}
×
2284
        err := c.List(context.TODO(), newEvents,
×
2285
                client.InNamespace(srcPVC.GetNamespace()),
×
2286
                client.MatchingFields{"involvedObject.name": srcPVC.GetName(),
×
2287
                        "involvedObject.uid": string(srcPVC.GetUID())},
×
2288
        )
×
2289

×
2290
        if err != nil {
×
2291
                klog.Error(err, "Could not retrieve srcPVC list of Events")
×
2292
        }
×
2293

2294
        currEvents := &corev1.EventList{}
×
2295
        err = c.List(context.TODO(), currEvents,
×
2296
                client.InNamespace(targetPVC.GetNamespace()),
×
2297
                client.MatchingFields{"involvedObject.name": targetPVC.GetName(),
×
2298
                        "involvedObject.uid": string(targetPVC.GetUID())},
×
2299
        )
×
2300

×
2301
        if err != nil {
×
2302
                klog.Error(err, "Could not retrieve targetPVC list of Events")
×
2303
        }
×
2304

2305
        // use this to hash each message for quick lookup, value is unused
2306
        eventMap := map[string]struct{}{}
×
2307

×
2308
        for _, event := range currEvents.Items {
×
2309
                eventMap[event.Message] = struct{}{}
×
2310
        }
×
2311

2312
        for _, newEvent := range newEvents.Items {
×
2313
                msg := newEvent.Message
×
2314

×
2315
                // check if target PVC already has this equivalent event
×
2316
                if _, exists := eventMap[msg]; exists {
×
2317
                        continue
×
2318
                }
2319

2320
                formattedMsg := srcPrefixMsg + msg
×
2321
                // check if we already emitted this event with the src prefix
×
2322
                if _, exists := eventMap[formattedMsg]; exists {
×
2323
                        continue
×
2324
                }
2325
                recorder.Event(targetPVC, newEvent.Type, newEvent.Reason, formattedMsg)
×
2326
        }
2327
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc