• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 20685028302

04 Jan 2026 12:20AM UTC coverage: 79.56% (-1.0%) from 80.538%
20685028302

Pull #2293

github

landreasyan
chore: add prometheus csi metrics
Pull Request #2293: feat: Add CSI-specific Prometheus metrics

71 of 129 new or added lines in 4 files covered. (55.04%)

283 existing lines in 3 files now uncovered.

2569 of 3229 relevant lines covered (79.56%)

8.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.18
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
38
        "github.com/container-storage-interface/spec/lib/go/csi"
39

40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/klog/v2"
42
        "k8s.io/utils/ptr"
43

44
        csiMetrics "sigs.k8s.io/blob-csi-driver/pkg/metrics"
45
        "sigs.k8s.io/blob-csi-driver/pkg/util"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
47
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
49
        "sigs.k8s.io/cloud-provider-azure/pkg/provider/storage"
50
)
51

52
const (
53
        privateEndpoint = "privateendpoint"
54

55
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
56
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
57
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
58
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
59
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
60
        MSI                             = "MSI"
61
        SPN                             = "SPN"
62
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
63

64
        createdByMetadata = "createdBy"
65
)
66

67
// CreateVolume provisions a volume
68
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
25✔
69
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
25✔
UNCOV
70
                klog.Errorf("invalid create volume req: %v", req)
×
UNCOV
71
                return nil, err
×
UNCOV
72
        }
×
73

74
        volName := req.GetName()
25✔
75
        if len(volName) == 0 {
26✔
76
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
77
        }
1✔
78

79
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
26✔
80
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
81
        }
2✔
82

83
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
22✔
84
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
22✔
85

22✔
86
        volContentSource := req.GetVolumeContentSource()
22✔
87
        secrets := req.GetSecrets()
22✔
88

22✔
89
        parameters := req.GetParameters()
22✔
90
        if parameters == nil {
23✔
91
                parameters = make(map[string]string)
1✔
92
        }
1✔
93
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace, tagValueDelimiter string
22✔
94
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3, allowSharedKeyAccess *bool
22✔
95
        var vnetResourceGroup, vnetName, vnetLinkName, publicNetworkAccess, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy, srcAccountName string
22✔
96
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
22✔
97
        var softDeleteBlobs, softDeleteContainers int32
22✔
98
        var vnetResourceIDs []string
22✔
99
        var err error
22✔
100
        // set allowBlobPublicAccess as false by default
22✔
101
        allowBlobPublicAccess := ptr.To(false)
22✔
102

22✔
103
        containerNameReplaceMap := map[string]string{}
22✔
104

22✔
105
        // store account key to k8s secret by default
22✔
106
        storeAccountKey := true
22✔
107

22✔
108
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
22✔
109
        // the values to the cloud provider.
22✔
110
        for k, v := range parameters {
133✔
111
                switch strings.ToLower(k) {
111✔
112
                case skuNameField:
10✔
113
                        storageAccountType = v
10✔
114
                case storageAccountTypeField:
11✔
115
                        storageAccountType = v
11✔
116
                case locationField:
10✔
117
                        location = v
10✔
118
                case storageAccountField:
11✔
119
                        account = v
11✔
UNCOV
120
                case subscriptionIDField:
×
UNCOV
121
                        subsID = v
×
122
                case resourceGroupField:
10✔
123
                        resourceGroup = v
10✔
124
                case containerNameField:
11✔
125
                        containerName = v
11✔
126
                case containerNamePrefixField:
2✔
127
                        containerNamePrefix = v
2✔
128
                case protocolField:
10✔
129
                        protocol = v
10✔
130
                case tagsField:
1✔
131
                        customTags = v
1✔
132
                case matchTagsField:
1✔
133
                        matchTags = strings.EqualFold(v, trueValue)
1✔
UNCOV
134
                case secretNameField:
×
UNCOV
135
                        secretName = v
×
UNCOV
136
                case secretNamespaceField:
×
137
                        secretNamespace = v
×
138
                case isHnsEnabledField:
×
139
                        if strings.EqualFold(v, trueValue) {
×
140
                                isHnsEnabled = ptr.To(true)
×
141
                        }
×
142
                case softDeleteBlobsField:
×
143
                        days, err := parseDays(v)
×
144
                        if err != nil {
×
145
                                return nil, err
×
146
                        }
×
147
                        softDeleteBlobs = days
×
148
                case softDeleteContainersField:
×
149
                        days, err := parseDays(v)
×
150
                        if err != nil {
×
151
                                return nil, err
×
152
                        }
×
153
                        softDeleteContainers = days
×
154
                case enableBlobVersioningField:
×
155
                        enableBlobVersioning = ptr.To(strings.EqualFold(v, trueValue))
×
156
                case storeAccountKeyField:
3✔
157
                        if strings.EqualFold(v, falseValue) {
5✔
158
                                storeAccountKey = false
2✔
159
                        }
2✔
160
                case getLatestAccountKeyField:
1✔
161
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
162
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
163
                        }
1✔
UNCOV
164
                case allowBlobPublicAccessField:
×
UNCOV
165
                        if strings.EqualFold(v, trueValue) {
×
UNCOV
166
                                allowBlobPublicAccess = ptr.To(true)
×
167
                        }
×
168
                case publicNetworkAccessField:
1✔
169
                        publicNetworkAccess = v
1✔
170
                case allowSharedKeyAccessField:
2✔
171
                        var boolValue bool
2✔
172
                        if boolValue, err = strconv.ParseBool(v); err != nil {
3✔
173
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", allowSharedKeyAccessField, v)
1✔
174
                        }
1✔
175
                        allowSharedKeyAccess = ptr.To(boolValue)
1✔
UNCOV
176
                case requireInfraEncryptionField:
×
UNCOV
177
                        if strings.EqualFold(v, trueValue) {
×
UNCOV
178
                                requireInfraEncryption = ptr.To(true)
×
179
                        }
×
180
                case pvcNamespaceKey:
×
181
                        pvcNamespace = v
×
182
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
183
                case pvcNameKey:
×
184
                        containerNameReplaceMap[pvcNameMetadata] = v
×
185
                case pvNameKey:
×
186
                        containerNameReplaceMap[pvNameMetadata] = v
×
187
                case serverNameField:
×
188
                case storageAuthTypeField:
1✔
189
                case storageIdentityClientIDField:
1✔
190
                case storageIdentityObjectIDField:
1✔
191
                case storageIdentityResourceIDField:
1✔
192
                case clientIDField:
1✔
193
                case mountWithWITokenField:
1✔
194
                case tenantIDField:
1✔
195
                case msiEndpointField:
1✔
196
                case storageAADEndpointField:
1✔
197
                case blobStorageAccountTypeField:
1✔
198
                        // no op, only used in NodeStageVolume
UNCOV
199
                case storageEndpointSuffixField:
×
UNCOV
200
                        storageEndpointSuffix = v
×
UNCOV
201
                case vnetResourceGroupField:
×
202
                        vnetResourceGroup = v
×
203
                case vnetNameField:
×
204
                        vnetName = v
×
205
                case vnetLinkNameField:
1✔
206
                        vnetLinkName = v
1✔
207
                case subnetNameField:
1✔
208
                        subnetName = v
1✔
UNCOV
209
                case accessTierField:
×
UNCOV
210
                        accessTier = v
×
211
                case networkEndpointTypeField:
1✔
212
                        networkEndpointType = v
1✔
213
                case mountPermissionsField:
11✔
214
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
11✔
215
                        if v != "" {
22✔
216
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
12✔
217
                                        return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s in storage class", v)
1✔
218
                                }
1✔
219
                        }
220
                case useDataPlaneAPIField:
1✔
221
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
222
                case fsGroupChangePolicyField:
1✔
223
                        fsGroupChangePolicy = v
1✔
UNCOV
224
                case tagValueDelimiterField:
×
UNCOV
225
                        tagValueDelimiter = v
×
226
                default:
1✔
227
                        return nil, status.Errorf(codes.InvalidArgument, "invalid parameter %q in storage class", k)
1✔
228
                }
229
        }
230

231
        if ptr.Deref(enableBlobVersioning, false) {
18✔
UNCOV
232
                if isNFSProtocol(protocol) || ptr.Deref(isHnsEnabled, false) {
×
UNCOV
233
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
UNCOV
234
                }
×
235
        }
236

237
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
19✔
238
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
239
        }
1✔
240

241
        if matchTags && account != "" {
18✔
242
                return nil, status.Errorf(codes.InvalidArgument, "matchTags must set as false when storageAccount(%s) is provided", account)
1✔
243
        }
1✔
244

245
        if resourceGroup == "" {
23✔
246
                resourceGroup = d.cloud.ResourceGroup
7✔
247
        }
7✔
248

249
        if secretNamespace == "" {
32✔
250
                if pvcNamespace == "" {
32✔
251
                        secretNamespace = defaultNamespace
16✔
252
                } else {
16✔
UNCOV
253
                        secretNamespace = pvcNamespace
×
UNCOV
254
                }
×
255
        }
256

257
        if protocol == "" {
22✔
258
                protocol = Fuse
6✔
259
        }
6✔
260
        if !isSupportedProtocol(protocol) {
17✔
261
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
262
        }
1✔
263
        if !isSupportedAccessTier(accessTier) {
15✔
UNCOV
264
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
UNCOV
265
        }
×
266
        if !isSupportedPublicNetworkAccess(publicNetworkAccess) {
16✔
267
                return nil, status.Errorf(codes.InvalidArgument, "publicNetworkAccess(%s) is not supported, supported PublicNetworkAccess list: %v", publicNetworkAccess, armstorage.PossiblePublicNetworkAccessValues())
1✔
268
        }
1✔
269

270
        if containerName != "" && containerNamePrefix != "" {
15✔
271
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
272
        }
1✔
273
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
14✔
274
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
275
        }
1✔
276

277
        enableHTTPSTrafficOnly := true
12✔
278
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
13✔
279
                if strings.Contains(subnetName, ",") {
2✔
280
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
281
                }
1✔
UNCOV
282
                createPrivateEndpoint = ptr.To(true)
×
283
        }
284
        accountKind := string(armstorage.KindStorageV2)
11✔
285
        if isNFSProtocol(protocol) {
12✔
286
                isHnsEnabled = ptr.To(true)
1✔
287
                enableNfsV3 = ptr.To(true)
1✔
288
                // NFS protocol does not need account key
1✔
289
                storeAccountKey = false
1✔
290
                if !ptr.Deref(createPrivateEndpoint, false) {
2✔
291
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
292
                        var err error
1✔
293
                        if vnetResourceIDs, err = d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnetName); err != nil {
2✔
294
                                return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
295
                        }
1✔
296
                }
297
        }
298

299
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
11✔
300
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
301
        }
1✔
302
        if IsAzureStackCloud(d.cloud) {
11✔
303
                accountKind = string(armstorage.KindStorage)
1✔
304
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
305
                        return nil, status.Errorf(codes.InvalidArgument, "Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, armstorage.SKUNamePremiumLRS, armstorage.SKUNameStandardLRS)
1✔
306
                }
1✔
307
        }
308

309
        tags, err := util.ConvertTagsToMap(customTags, tagValueDelimiter)
9✔
310
        if err != nil {
10✔
311
                return nil, status.Errorf(codes.InvalidArgument, "%v", err)
1✔
312
        }
1✔
313

314
        if strings.TrimSpace(storageEndpointSuffix) == "" {
16✔
315
                storageEndpointSuffix = d.getStorageEndPointSuffix()
8✔
316
        }
8✔
317

318
        if storeAccountKey && !ptr.Deref(allowSharedKeyAccess, true) {
9✔
319
                return nil, status.Errorf(codes.InvalidArgument, "storeAccountKey is not supported for account with shared access key disabled")
1✔
320
        }
1✔
321

322
        if volContentSource != nil {
9✔
323
                switch volContentSource.Type.(type) {
2✔
324
                case *csi.VolumeContentSource_Snapshot:
1✔
325
                        return nil, status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
326
                case *csi.VolumeContentSource_Volume:
1✔
327
                        if volContentSource.GetVolume() != nil {
2✔
328
                                sourceID := volContentSource.GetVolume().VolumeId
1✔
329
                                _, srcAccountName, _, _, _, err = GetContainerInfo(sourceID) //nolint:dogsled
1✔
330
                                if err != nil {
2✔
331
                                        klog.Errorf("failed to get source volume info from sourceID(%s), error: %v", sourceID, err)
1✔
332
                                } else {
1✔
UNCOV
333
                                        klog.V(2).Infof("source volume account name: %s, sourceID: %s", srcAccountName, sourceID)
×
UNCOV
334
                                }
×
335
                        }
336
                }
337
        }
338

339
        accountOptions := &storage.AccountOptions{
6✔
340
                Name:                            account,
6✔
341
                Type:                            storageAccountType,
6✔
342
                Kind:                            accountKind,
6✔
343
                SubscriptionID:                  subsID,
6✔
344
                ResourceGroup:                   resourceGroup,
6✔
345
                Location:                        location,
6✔
346
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
6✔
347
                VirtualNetworkResourceIDs:       vnetResourceIDs,
6✔
348
                Tags:                            tags,
6✔
349
                MatchTags:                       matchTags,
6✔
350
                IsHnsEnabled:                    isHnsEnabled,
6✔
351
                EnableNfsV3:                     enableNfsV3,
6✔
352
                AllowBlobPublicAccess:           allowBlobPublicAccess,
6✔
353
                PublicNetworkAccess:             publicNetworkAccess,
6✔
354
                AllowSharedKeyAccess:            allowSharedKeyAccess,
6✔
355
                RequireInfrastructureEncryption: requireInfraEncryption,
6✔
356
                VNetResourceGroup:               vnetResourceGroup,
6✔
357
                VNetName:                        vnetName,
6✔
358
                VNetLinkName:                    vnetLinkName,
6✔
359
                SubnetName:                      subnetName,
6✔
360
                AccessTier:                      accessTier,
6✔
361
                CreatePrivateEndpoint:           createPrivateEndpoint,
6✔
362
                StorageType:                     storage.StorageTypeBlob,
6✔
363
                StorageEndpointSuffix:           storageEndpointSuffix,
6✔
364
                EnableBlobVersioning:            enableBlobVersioning,
6✔
365
                SoftDeleteBlobs:                 softDeleteBlobs,
6✔
366
                SoftDeleteContainers:            softDeleteContainers,
6✔
367
                GetLatestAccountKey:             getLatestAccountKey,
6✔
368
                SourceAccountName:               srcAccountName,
6✔
369
        }
6✔
370

6✔
371
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
6✔
372
        validContainerName := containerName
6✔
373
        if validContainerName == "" {
7✔
374
                validContainerName = volName
1✔
375
                if containerNamePrefix != "" {
1✔
UNCOV
376
                        validContainerName = containerNamePrefix + "-" + volName
×
UNCOV
377
                }
×
378
                validContainerName = getValidContainerName(validContainerName, protocol)
1✔
379
                setKeyValueInMap(parameters, containerNameField, validContainerName)
1✔
380
        }
381

382
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
6✔
UNCOV
383
                // logging the job status if it's volume cloning
×
UNCOV
384
                if volContentSource != nil {
×
UNCOV
385
                        jobState, percent, err := d.azcopy.GetAzcopyJob(validContainerName, []string{})
×
UNCOV
386
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
UNCOV
387
                }
×
388
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
389
        }
390
        defer d.volumeLocks.Release(volName)
6✔
391

6✔
392
        var volumeID string
6✔
393
        csiMC := csiMetrics.NewCSIMetricContext("controller_create_volume")
6✔
394
        isOperationSucceeded := false
6✔
395
        defer func() {
12✔
396
                csiMC.ObserveWithLabels(isOperationSucceeded,
6✔
397
                        "volume_id", volumeID,
6✔
398
                        "storage_account_type", storageAccountType,
6✔
399
                        "protocol", protocol)
6✔
400
        }()
6✔
401

402
        var accountKey string
6✔
403
        accountName := account
6✔
404
        if len(secrets) == 0 && accountName == "" {
7✔
405
                if v, ok := d.volMap.Load(volName); ok {
1✔
UNCOV
406
                        accountName = v.(string)
×
407
                } else {
1✔
408
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, ptr.Deref(createPrivateEndpoint, false))
1✔
409
                        // search in cache first
1✔
410
                        cache, err := d.accountSearchCache.Get(ctx, lockKey, azcache.CacheReadTypeDefault)
1✔
411
                        if err != nil {
1✔
UNCOV
412
                                return nil, status.Errorf(codes.Internal, "%v", err)
×
UNCOV
413
                        }
×
414
                        if cache != nil {
1✔
UNCOV
415
                                accountName = cache.(string)
×
416
                        } else {
1✔
417
                                d.volLockMap.LockEntry(lockKey)
1✔
418
                                err = wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
2✔
419
                                        var retErr error
1✔
420
                                        azureMC := metrics.NewMetricContext(blobCSIDriverName, "storage_account_ensure", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
1✔
421
                                        isAzureOpSucceeded := false
1✔
422
                                        defer func() {
2✔
423
                                                azureMC.ObserveOperationWithResult(isAzureOpSucceeded)
1✔
424
                                        }()
1✔
425
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
426
                                        if isRetriableError(retErr) {
1✔
UNCOV
427
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
UNCOV
428
                                                return false, nil
×
UNCOV
429
                                        }
×
430
                                        isAzureOpSucceeded = (retErr == nil)
1✔
431
                                        return true, retErr
1✔
432
                                })
433
                                d.volLockMap.UnlockEntry(lockKey)
1✔
434
                                if err != nil {
2✔
435
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
436
                                }
1✔
UNCOV
437
                                d.accountSearchCache.Set(lockKey, accountName)
×
UNCOV
438
                                d.volMap.Store(volName, accountName)
×
439
                        }
440
                }
441
        }
442

443
        if ptr.Deref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
5✔
UNCOV
444
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
UNCOV
445
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
UNCOV
446
                //
×
UNCOV
447
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
448
                // by private dns zone, which includes CNAME record, documented here:
×
449
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
450
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
451
        }
×
452

453
        accountOptions.Name = accountName
5✔
454
        if len(secrets) == 0 && useDataPlaneAPI {
6✔
455
                if accountKey == "" {
2✔
456
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
457
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
458
                        }
1✔
459
                }
UNCOV
460
                secrets = createStorageAccountSecret(accountName, accountKey)
×
461
        }
462

463
        klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
4✔
464
        if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, storageEndpointSuffix, secrets); err != nil {
5✔
465
                return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
466
        }
1✔
467
        if volContentSource != nil {
4✔
468
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, false)
1✔
469
                if err != nil {
1✔
UNCOV
470
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
UNCOV
471
                }
×
472
                copyErr := d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
1✔
473
                if accountSASToken == "" && copyErr != nil && strings.Contains(copyErr.Error(), authorizationPermissionMismatch) {
1✔
474
                        klog.Warningf("azcopy copy failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original error: %v", copyErr)
×
475
                        accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace, true)
×
UNCOV
476
                        if err != nil {
×
UNCOV
477
                                return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
478
                        }
×
479
                        copyErr = d.copyVolume(ctx, req, accountName, accountSASToken, authAzcopyEnv, validContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
×
480
                }
481
                if copyErr != nil {
2✔
482
                        return nil, copyErr
1✔
483
                }
1✔
484
        }
485

486
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
487
                if accountKey == "" {
4✔
488
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
489
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
490
                        }
1✔
491
                }
492

493
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
494
                if err != nil {
1✔
UNCOV
495
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
UNCOV
496
                }
×
497
                if secretName != "" {
1✔
UNCOV
498
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
499
                }
×
500
        }
501

502
        var uuid string
1✔
503
        if containerName != "" {
2✔
504
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
505
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
506
                uuid = volName
1✔
507
        }
1✔
508
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
509
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
510

1✔
511
        if useDataPlaneAPI {
1✔
UNCOV
512
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
UNCOV
513
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
UNCOV
514
        }
×
515

516
        isOperationSucceeded = true
1✔
517
        // reset secretNamespace field in VolumeContext
1✔
518
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
519
        return &csi.CreateVolumeResponse{
1✔
520
                Volume: &csi.Volume{
1✔
521
                        VolumeId:      volumeID,
1✔
522
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
523
                        VolumeContext: parameters,
1✔
524
                        ContentSource: volContentSource,
1✔
525
                },
1✔
526
        }, nil
1✔
527
}
528

529
// DeleteVolume delete a volume
530
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
531
        volumeID := req.GetVolumeId()
5✔
532
        if len(volumeID) == 0 {
6✔
533
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
534
        }
1✔
535

536
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
UNCOV
537
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
UNCOV
538
        }
×
539

540
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
541
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
542
        }
×
543
        defer d.volumeLocks.Release(volumeID)
4✔
544

4✔
545
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
546
        if err != nil {
5✔
547
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
548
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
549
                return &csi.DeleteVolumeResponse{}, nil
1✔
550
        }
1✔
551

552
        secrets := req.GetSecrets()
3✔
553
        if len(secrets) == 0 && d.useDataPlaneAPI(ctx, volumeID, accountName) {
4✔
554
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
555
                if err != nil {
2✔
556
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
557
                }
1✔
UNCOV
558
                if accountName != "" && accountKey != "" {
×
UNCOV
559
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
UNCOV
560
                }
×
561
        }
562

563
        csiMC := csiMetrics.NewCSIMetricContext("controller_delete_volume")
2✔
564
        isOperationSucceeded := false
2✔
565
        defer func() {
4✔
566
                csiMC.ObserveWithLabels(isOperationSucceeded, "volume_id", volumeID)
2✔
567
        }()
2✔
568

569
        if resourceGroupName == "" {
3✔
570
                resourceGroupName = d.cloud.ResourceGroup
1✔
571
        }
1✔
572
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
573
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
574
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
575
        }
2✔
576

UNCOV
577
        isOperationSucceeded = true
×
UNCOV
578
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
UNCOV
579
        return &csi.DeleteVolumeResponse{}, nil
×
580
}
581

582
// ValidateVolumeCapabilities return the capabilities of the volume
583
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
584
        volumeID := req.GetVolumeId()
8✔
585
        if len(volumeID) == 0 {
9✔
586
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
587
        }
1✔
588
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
589
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
590
        }
2✔
591

592
        csiMC := csiMetrics.NewCSIMetricContext("controller_validate_volume_capabilities")
5✔
593
        isOperationSucceeded := false
5✔
594
        defer func() {
10✔
595
                csiMC.Observe(isOperationSucceeded)
5✔
596
        }()
5✔
597

598
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
599
        if err != nil {
6✔
600
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
601
                return nil, status.Error(codes.NotFound, err.Error())
1✔
602
        }
1✔
603

604
        var exist bool
4✔
605
        secrets := req.GetSecrets()
4✔
606
        if len(secrets) > 0 {
5✔
607
                container, err := getContainerReference(containerName, secrets, d.getStorageEndPointSuffix())
1✔
608
                if err != nil {
2✔
609
                        return nil, status.Error(codes.Internal, err.Error())
1✔
610
                }
1✔
UNCOV
611
                exist, err = container.Exists()
×
UNCOV
612
                if err != nil {
×
UNCOV
613
                        return nil, status.Error(codes.Internal, err.Error())
×
UNCOV
614
                }
×
615
        } else {
3✔
616
                if resourceGroupName == "" {
3✔
617
                        resourceGroupName = d.cloud.ResourceGroup
×
618
                }
×
619
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
620
                if err != nil {
3✔
621
                        return nil, status.Error(codes.Internal, err.Error())
×
622
                }
×
623

624
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
625
                if err != nil {
4✔
626
                        return nil, status.Error(codes.Internal, err.Error())
1✔
627
                }
1✔
628
                if blobContainer.ContainerProperties == nil {
3✔
629
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
630
                }
1✔
631
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
632
        }
633
        if !exist {
2✔
634
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
635
        }
1✔
UNCOV
636
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
UNCOV
637

×
NEW
UNCOV
638
        isOperationSucceeded = true
×
UNCOV
639
        // blob driver supports all AccessModes, no need to check capabilities here
×
640
        return &csi.ValidateVolumeCapabilitiesResponse{
×
641
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
642
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
643
                },
×
644
                Message: "",
×
645
        }, nil
×
646
}
647

648
// ControllerModifyVolume modify volume
649
func (d *Driver) ControllerModifyVolume(_ context.Context, _ *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
×
UNCOV
650
        return nil, status.Error(codes.Unimplemented, "")
×
UNCOV
651
}
×
652

653
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
654
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
655
}
1✔
656

657
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
658
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
659
}
1✔
660

661
// ControllerGetVolume get volume
662
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
663
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
664
}
1✔
665

666
// GetCapacity returns the capacity of the total available storage pool
667
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
668
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
669
}
1✔
670

671
// ListVolumes return all available volumes
672
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
673
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
674
}
1✔
675

676
// CreateSnapshot create snapshot
677
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
678
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
679
}
1✔
680

681
// DeleteSnapshot delete snapshot
682
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
683
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
684
}
1✔
685

686
// ListSnapshots list snapshots
687
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
688
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
689
}
1✔
690

691
// ControllerGetCapabilities returns the capabilities of the Controller plugin
692
func (d *Driver) ControllerGetCapabilities(ctx context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
693
        return &csi.ControllerGetCapabilitiesResponse{
1✔
694
                Capabilities: d.Cap,
1✔
695
        }, nil
1✔
696
}
1✔
697

698
// ControllerExpandVolume controller expand volume
699
func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
700
        csiMC := csiMetrics.NewCSIMetricContext("controller_expand_volume")
4✔
701
        isOperationSucceeded := false
4✔
702
        defer func() {
8✔
703
                csiMC.Observe(isOperationSucceeded)
4✔
704
        }()
4✔
705

706
        if len(req.GetVolumeId()) == 0 {
5✔
707
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
708
        }
1✔
709

710
        if req.GetCapacityRange() == nil {
4✔
711
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
712
        }
1✔
713

714
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
UNCOV
715
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
UNCOV
716
        }
×
717

718
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
719
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
720

2✔
721
        if volSizeBytes > containerMaxSize {
3✔
722
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
723
        }
1✔
724

725
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
726

1✔
727
        isOperationSucceeded = true
1✔
728
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
729
}
730

731
// CreateBlobContainer creates a blob container
732
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName, storageEndpointSuffix string, secrets map[string]string) error {
10✔
733
        if containerName == "" {
11✔
734
                return fmt.Errorf("containerName is empty")
1✔
735
        }
1✔
736
        return wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
18✔
737
                var err error
9✔
738
                if len(secrets) > 0 {
10✔
739
                        container, getErr := getContainerReference(containerName, secrets, storageEndpointSuffix)
1✔
740
                        if getErr != nil {
2✔
741
                                return true, getErr
1✔
742
                        }
1✔
743
                        container.Metadata = map[string]string{createdByMetadata: d.Name}
×
744
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
745
                } else {
8✔
746
                        blobContainer := armstorage.BlobContainer{
8✔
747
                                ContainerProperties: &armstorage.ContainerProperties{
8✔
748
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
8✔
749
                                        Metadata:     map[string]*string{createdByMetadata: to.Ptr(d.Name)},
8✔
750
                                },
8✔
751
                        }
8✔
752
                        var blobClient blobcontainerclient.Interface
8✔
753
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
8✔
754
                        if err != nil {
8✔
UNCOV
755
                                return true, err
×
UNCOV
756
                        }
×
757
                        azureMC := metrics.NewMetricContext(blobCSIDriverName, "blob_container_create", resourceGroupName, subsID, d.Name)
8✔
758
                        isAzureOpSucceeded := false
8✔
759
                        defer func() {
16✔
760
                                azureMC.ObserveOperationWithResult(isAzureOpSucceeded)
8✔
761
                        }()
8✔
762
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
8✔
763
                        isAzureOpSucceeded = (err == nil)
8✔
764
                }
765
                if err != nil {
12✔
766
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
767
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
768
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
769
                                return false, nil
2✔
770
                        }
2✔
771
                }
772
                return true, err
6✔
773
        })
774
}
775

776
// DeleteBlobContainer deletes a blob container
777
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
778
        if containerName == "" {
9✔
779
                return fmt.Errorf("containerName is empty")
1✔
780
        }
1✔
781
        return wait.ExponentialBackoff(getBackOff(d.cloud.Config), func() (bool, error) {
14✔
782
                var err error
7✔
783
                if len(secrets) > 0 {
10✔
784
                        container, getErr := getContainerReference(containerName, secrets, d.getStorageEndPointSuffix())
3✔
785
                        if getErr != nil {
6✔
786
                                return true, getErr
3✔
787
                        }
3✔
788
                        _, err = container.DeleteIfExists(nil)
×
789
                } else {
4✔
790
                        var blobClient blobcontainerclient.Interface
4✔
791
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
792
                        if err != nil {
4✔
UNCOV
793
                                return true, err
×
UNCOV
794
                        }
×
795
                        azureMC := metrics.NewMetricContext(blobCSIDriverName, "blob_container_delete", resourceGroupName, subsID, d.Name)
4✔
796
                        isAzureOpSucceeded := false
4✔
797
                        defer func() {
8✔
798
                                azureMC.ObserveOperationWithResult(isAzureOpSucceeded)
4✔
799
                        }()
4✔
800
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
801
                        isAzureOpSucceeded = (err == nil || strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) || strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) || strings.Contains(err.Error(), statusCodeNotFound) || strings.Contains(err.Error(), httpCodeNotFound))
4✔
802
                }
803
                if err != nil {
7✔
804
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
805
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
806
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
807
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
808
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
809
                                return true, nil
2✔
810
                        }
2✔
811
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
812
                }
813
                return true, err
1✔
814
        })
815
}
816

817
// copyBlobContainer copies source volume content into a destination volume
818
func (d *Driver) copyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, dstAccountName string, dstAccountSasToken string, authAzcopyEnv []string, dstContainerName string, secretNamespace string, accountOptions *storage.AccountOptions, storageEndpointSuffix string) error {
7✔
819
        var sourceVolumeID string
7✔
820
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
14✔
821
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
7✔
822

7✔
823
        }
7✔
824
        srcResourceGroupName, srcAccountName, srcContainerName, _, srcSubscriptionID, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
7✔
825
        if err != nil {
9✔
826
                return status.Error(codes.NotFound, err.Error())
2✔
827
        }
2✔
828
        if dstAccountName == "" {
9✔
829
                dstAccountName = srcAccountName
4✔
830
        }
4✔
831
        if srcAccountName == "" || srcContainerName == "" || dstContainerName == "" {
7✔
832
                return fmt.Errorf("srcAccountName(%s) or srcContainerName(%s) or dstContainerName(%s) is empty", srcAccountName, srcContainerName, dstContainerName)
2✔
833
        }
2✔
834
        srcAccountSasToken := dstAccountSasToken
3✔
835
        if srcAccountName != dstAccountName && dstAccountSasToken != "" {
4✔
836
                srcAccountOptions := &storage.AccountOptions{
1✔
837
                        Name:                srcAccountName,
1✔
838
                        ResourceGroup:       srcResourceGroupName,
1✔
839
                        SubscriptionID:      srcSubscriptionID,
1✔
840
                        GetLatestAccountKey: accountOptions.GetLatestAccountKey,
1✔
841
                }
1✔
842
                if srcAccountSasToken, _, err = d.getAzcopyAuth(ctx, srcAccountName, "", storageEndpointSuffix, srcAccountOptions, nil, "", secretNamespace, true); err != nil {
2✔
843
                        return err
1✔
844
                }
1✔
845
        }
846
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", srcAccountName, storageEndpointSuffix, srcContainerName, srcAccountSasToken)
2✔
847
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", dstAccountName, storageEndpointSuffix, dstContainerName, dstAccountSasToken)
2✔
848

2✔
849
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
850
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
851
        switch jobState {
2✔
852
        case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped:
1✔
853
                return err
1✔
854
        case util.AzcopyJobRunning:
1✔
855
                err = wait.PollUntilContextTimeout(ctx, 20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, true, func(context.Context) (bool, error) {
4✔
856
                        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
3✔
857
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
3✔
858
                        if err != nil {
3✔
UNCOV
859
                                return false, err
×
UNCOV
860
                        }
×
861
                        if jobState == util.AzcopyJobRunning {
6✔
862
                                return false, nil
3✔
863
                        }
3✔
UNCOV
864
                        return true, nil
×
865
                })
866
        case util.AzcopyJobNotFound:
×
867
                klog.V(2).Infof("copy blob container %s:%s to %s:%s", srcAccountName, srcContainerName, dstAccountName, dstContainerName)
×
UNCOV
868
                execFunc := func() error {
×
UNCOV
869
                        if out, err := d.execAzcopyCopy(srcPath, dstPath, azcopyCloneVolumeOptions, authAzcopyEnv); err != nil {
×
UNCOV
870
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
871
                        }
×
UNCOV
872
                        return nil
×
873
                }
874
                timeoutFunc := func() error {
×
875
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
876
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
877
                }
×
878
                err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
879
        }
880
        if err != nil {
2✔
881
                klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", accountOptions.ResourceGroup, dstAccountName, dstContainerName, err)
1✔
882
        } else {
1✔
883
                klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
884
                if out, err := d.azcopy.CleanJobs(); err != nil {
×
885
                        klog.Warningf("clean azcopy jobs failed with error: %v, output: %s", err, string(out))
×
UNCOV
886
                }
×
887
        }
888
        return err
1✔
889
}
890

891
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
892
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountName string, accountSASToken string, authAzcopyEnv []string, dstContainerName, secretNamespace string, accountOptions *storage.AccountOptions, storageEndpointSuffix string) error {
8✔
893
        vs := req.VolumeContentSource
8✔
894
        switch vs.Type.(type) {
8✔
895
        case *csi.VolumeContentSource_Snapshot:
1✔
896
                return status.Errorf(codes.InvalidArgument, "VolumeContentSource Snapshot is not yet implemented")
1✔
897
        case *csi.VolumeContentSource_Volume:
7✔
898
                return d.copyBlobContainer(ctx, req, accountName, accountSASToken, authAzcopyEnv, dstContainerName, secretNamespace, accountOptions, storageEndpointSuffix)
7✔
UNCOV
899
        default:
×
UNCOV
900
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
901
        }
902
}
903

904
// execAzcopyCopy exec azcopy copy command
UNCOV
905
func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, authAzcopyEnv []string) ([]byte, error) {
×
906
        cmd := exec.Command("azcopy", "copy", srcPath, dstPath)
×
907
        cmd.Args = append(cmd.Args, azcopyCopyOptions...)
×
UNCOV
908
        if len(authAzcopyEnv) > 0 {
×
UNCOV
909
                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
UNCOV
910
        }
×
UNCOV
911
        return cmd.CombinedOutput()
×
912
}
913

914
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
915
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
6✔
916
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
6✔
917
        azureClientConfig := d.cloud.Config.AzureClientConfig
6✔
918
        var authAzcopyEnv []string
6✔
919
        if azureAuthConfig.UseManagedIdentityExtension {
9✔
920
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
3✔
921
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
4✔
922
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
923
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
924
                } else {
3✔
925
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
2✔
926
                }
2✔
927
                return authAzcopyEnv, nil
3✔
928
        }
929
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
930
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
931
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
932
                if azureAuthConfig.AADClientID == "" || azureClientConfig.TenantID == "" {
3✔
933
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
934
                }
1✔
935
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
936
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
937
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureClientConfig.TenantID))
1✔
938
                klog.V(2).Infof("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureClientConfig.TenantID)
1✔
939

1✔
940
                return authAzcopyEnv, nil
1✔
941
        }
942
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
943
}
944

945
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
946
// 1. secrets is not empty
947
// 2. driver is not using managed identity and service principal
948
// 3. parameter useSasToken is true
949
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *storage.AccountOptions, secrets map[string]string, secretName, secretNamespace string, useSasToken bool) (string, []string, error) {
6✔
950
        var authAzcopyEnv []string
6✔
951
        var err error
6✔
952
        if !useSasToken && !d.useDataPlaneAPI(ctx, "", accountName) && len(secrets) == 0 && len(secretName) == 0 {
7✔
953
                // search in cache first
1✔
954
                if cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
1✔
UNCOV
955
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
UNCOV
956
                        return cache.(string), nil, nil
×
UNCOV
957
                }
×
958

959
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
1✔
960
                if err != nil {
1✔
UNCOV
961
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
UNCOV
962
                }
×
963
        }
964

965
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
11✔
966
                if accountKey == "" {
10✔
967
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
7✔
968
                                return "", nil, err
2✔
969
                        }
2✔
970
                }
971
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
972
                sasToken, err := d.generateSASToken(ctx, accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
973
                return sasToken, nil, err
3✔
974
        }
975
        return "", authAzcopyEnv, nil
1✔
976
}
977

978
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
979
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
31✔
980
        if len(volCaps) == 0 {
33✔
981
                return fmt.Errorf("volume capabilities missing in request")
2✔
982
        }
2✔
983
        for _, c := range volCaps {
58✔
984
                if c.GetBlock() != nil {
31✔
985
                        return fmt.Errorf("block volume capability not supported")
2✔
986
                }
2✔
987
        }
988
        return nil
27✔
989
}
990

991
func parseDays(dayStr string) (int32, error) {
3✔
992
        days, err := strconv.Atoi(dayStr)
3✔
993
        if err != nil {
4✔
994
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class", softDeleteBlobsField, dayStr)
1✔
995
        }
1✔
996
        if days <= 0 || days > 365 {
3✔
997
                return 0, status.Errorf(codes.InvalidArgument, "invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr)
1✔
998
        }
1✔
999

1000
        return int32(days), nil
1✔
1001
}
1002

1003
// generateSASToken generate a sas token for storage account
1004
func (d *Driver) generateSASToken(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
1005
        // search in cache first
5✔
1006
        cache, err := d.azcopySasTokenCache.Get(ctx, accountName, azcache.CacheReadTypeDefault)
5✔
1007
        if err != nil {
5✔
UNCOV
1008
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
UNCOV
1009
        }
×
1010
        if cache != nil {
5✔
UNCOV
1011
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
UNCOV
1012
                return cache.(string), nil
×
UNCOV
1013
        }
×
1014

1015
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
1016
        if err != nil {
8✔
1017
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new shared key credential, accountName: %s, err: %v", accountName, err)
3✔
1018
        }
3✔
1019
        clientOptions := service.ClientOptions{}
2✔
1020
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
1021
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
1022
        if err != nil {
2✔
1023
                return "", status.Errorf(codes.Internal, "failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %v", accountName, err)
×
1024
        }
×
1025
        sasURL, err := serviceClient.GetSASURL(
2✔
1026
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
1027
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
1028
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
1029
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
1030
        )
2✔
1031
        if err != nil {
2✔
UNCOV
1032
                return "", err
×
UNCOV
1033
        }
×
1034
        u, err := url.Parse(sasURL)
2✔
1035
        if err != nil {
2✔
1036
                return "", err
×
UNCOV
1037
        }
×
1038
        sasToken := "?" + u.RawQuery
2✔
1039
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
1040
        return sasToken, nil
2✔
1041
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc