• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 9672626509

26 Jun 2024 02:31AM UTC coverage: 71.895%. Remained the same
9672626509

push

github

web-flow
Merge pull request #1453 from andyzhangx/cut-v1.23.6

doc: cut v1.23.6 release

2049 of 2850 relevant lines covered (71.89%)

6.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.06
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
36
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
37
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
38
        "github.com/container-storage-interface/spec/lib/go/csi"
39

40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/klog/v2"
42
        "k8s.io/utils/pointer"
43

44
        "sigs.k8s.io/blob-csi-driver/pkg/util"
45
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
47
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
48
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
49
)
50

51
const (
52
        privateEndpoint = "privateendpoint"
53

54
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
55
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
56
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
57
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
58
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
59
        MSI                             = "MSI"
60
        SPN                             = "SPN"
61
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
62

63
        waitForAzCopyInterval = 2 * time.Second
64
)
65

66
// CreateVolume provisions a volume
67
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
21✔
68
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
21✔
69
                klog.Errorf("invalid create volume req: %v", req)
×
70
                return nil, err
×
71
        }
×
72

73
        volName := req.GetName()
21✔
74
        if len(volName) == 0 {
22✔
75
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
76
        }
1✔
77

78
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
22✔
79
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
80
        }
2✔
81

82
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
18✔
83
                // logging the job status if it's volume cloning
×
84
                if req.GetVolumeContentSource() != nil {
×
85
                        jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
×
86
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
×
87
                }
×
88
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
89
        }
90
        defer d.volumeLocks.Release(volName)
18✔
91

18✔
92
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
18✔
93
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
18✔
94

18✔
95
        parameters := req.GetParameters()
18✔
96
        if parameters == nil {
19✔
97
                parameters = make(map[string]string)
1✔
98
        }
1✔
99
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace string
18✔
100
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3 *bool
18✔
101
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix string
18✔
102
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
18✔
103
        var softDeleteBlobs, softDeleteContainers int32
18✔
104
        var vnetResourceIDs []string
18✔
105
        var err error
18✔
106
        // set allowBlobPublicAccess as false by default
18✔
107
        allowBlobPublicAccess := pointer.Bool(false)
18✔
108

18✔
109
        containerNameReplaceMap := map[string]string{}
18✔
110

18✔
111
        // store account key to k8s secret by default
18✔
112
        storeAccountKey := true
18✔
113

18✔
114
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
18✔
115
        // the values to the cloud provider.
18✔
116
        for k, v := range parameters {
112✔
117
                switch strings.ToLower(k) {
94✔
118
                case skuNameField:
9✔
119
                        storageAccountType = v
9✔
120
                case storageAccountTypeField:
10✔
121
                        storageAccountType = v
10✔
122
                case locationField:
9✔
123
                        location = v
9✔
124
                case storageAccountField:
10✔
125
                        account = v
10✔
126
                case subscriptionIDField:
×
127
                        subsID = v
×
128
                case resourceGroupField:
9✔
129
                        resourceGroup = v
9✔
130
                case containerNameField:
10✔
131
                        containerName = v
10✔
132
                case containerNamePrefixField:
2✔
133
                        containerNamePrefix = v
2✔
134
                case protocolField:
9✔
135
                        protocol = v
9✔
136
                case tagsField:
1✔
137
                        customTags = v
1✔
138
                case matchTagsField:
1✔
139
                        matchTags = strings.EqualFold(v, trueValue)
1✔
140
                case secretNameField:
×
141
                        secretName = v
×
142
                case secretNamespaceField:
×
143
                        secretNamespace = v
×
144
                case isHnsEnabledField:
×
145
                        if strings.EqualFold(v, trueValue) {
×
146
                                isHnsEnabled = pointer.Bool(true)
×
147
                        }
×
148
                case softDeleteBlobsField:
×
149
                        days, err := parseDays(v)
×
150
                        if err != nil {
×
151
                                return nil, err
×
152
                        }
×
153
                        softDeleteBlobs = days
×
154
                case softDeleteContainersField:
×
155
                        days, err := parseDays(v)
×
156
                        if err != nil {
×
157
                                return nil, err
×
158
                        }
×
159
                        softDeleteContainers = days
×
160
                case enableBlobVersioningField:
×
161
                        enableBlobVersioning = pointer.Bool(strings.EqualFold(v, trueValue))
×
162
                case storeAccountKeyField:
3✔
163
                        if strings.EqualFold(v, falseValue) {
5✔
164
                                storeAccountKey = false
2✔
165
                        }
2✔
166
                case getLatestAccountKeyField:
1✔
167
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
168
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
169
                        }
1✔
170
                case allowBlobPublicAccessField:
×
171
                        if strings.EqualFold(v, trueValue) {
×
172
                                allowBlobPublicAccess = pointer.Bool(true)
×
173
                        }
×
174
                case requireInfraEncryptionField:
×
175
                        if strings.EqualFold(v, trueValue) {
×
176
                                requireInfraEncryption = pointer.Bool(true)
×
177
                        }
×
178
                case pvcNamespaceKey:
×
179
                        pvcNamespace = v
×
180
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
181
                case pvcNameKey:
×
182
                        containerNameReplaceMap[pvcNameMetadata] = v
×
183
                case pvNameKey:
×
184
                        containerNameReplaceMap[pvNameMetadata] = v
×
185
                case serverNameField:
×
186
                case storageAuthTypeField:
1✔
187
                case storageIentityClientIDField:
1✔
188
                case storageIdentityObjectIDField:
1✔
189
                case storageIdentityResourceIDField:
1✔
190
                case msiEndpointField:
1✔
191
                case storageAADEndpointField:
1✔
192
                        // no op, only used in NodeStageVolume
193
                case storageEndpointSuffixField:
×
194
                        storageEndpointSuffix = v
×
195
                case vnetResourceGroupField:
×
196
                        vnetResourceGroup = v
×
197
                case vnetNameField:
×
198
                        vnetName = v
×
199
                case subnetNameField:
1✔
200
                        subnetName = v
1✔
201
                case accessTierField:
×
202
                        accessTier = v
×
203
                case networkEndpointTypeField:
1✔
204
                        networkEndpointType = v
1✔
205
                case mountPermissionsField:
10✔
206
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
10✔
207
                        if v != "" {
20✔
208
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
11✔
209
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
210
                                }
1✔
211
                        }
212
                case useDataPlaneAPIField:
1✔
213
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
214
                default:
1✔
215
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
216
                }
217
        }
218

219
        if pointer.BoolDeref(enableBlobVersioning, false) {
15✔
220
                if isNFSProtocol(protocol) || pointer.BoolDeref(isHnsEnabled, false) {
×
221
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
222
                }
×
223
        }
224

225
        if matchTags && account != "" {
16✔
226
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
227
        }
1✔
228

229
        if resourceGroup == "" {
20✔
230
                resourceGroup = d.cloud.ResourceGroup
6✔
231
        }
6✔
232

233
        if secretNamespace == "" {
28✔
234
                if pvcNamespace == "" {
28✔
235
                        secretNamespace = defaultNamespace
14✔
236
                } else {
14✔
237
                        secretNamespace = pvcNamespace
×
238
                }
×
239
        }
240

241
        if protocol == "" {
19✔
242
                protocol = Fuse
5✔
243
        }
5✔
244
        if !isSupportedProtocol(protocol) {
15✔
245
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
246
        }
1✔
247
        if !isSupportedAccessTier(accessTier) {
13✔
248
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, storage.PossibleAccessTierValues())
×
249
        }
×
250

251
        if containerName != "" && containerNamePrefix != "" {
14✔
252
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
253
        }
1✔
254
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
13✔
255
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
256
        }
1✔
257

258
        enableHTTPSTrafficOnly := true
11✔
259
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
12✔
260
                if strings.Contains(subnetName, ",") {
2✔
261
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
262
                }
1✔
263
                createPrivateEndpoint = pointer.BoolPtr(true)
×
264
        }
265
        accountKind := string(storage.KindStorageV2)
10✔
266
        if isNFSProtocol(protocol) {
11✔
267
                isHnsEnabled = pointer.Bool(true)
1✔
268
                enableNfsV3 = pointer.Bool(true)
1✔
269
                // NFS protocol does not need account key
1✔
270
                storeAccountKey = false
1✔
271
                if !pointer.BoolDeref(createPrivateEndpoint, false) {
2✔
272
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
273
                        subnets := strings.Split(subnetName, ",")
1✔
274
                        for _, subnet := range subnets {
2✔
275
                                subnet = strings.TrimSpace(subnet)
1✔
276
                                vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, subnet)
1✔
277
                                klog.V(2).Infof("set vnetResourceID(%s) for NFS protocol", vnetResourceID)
1✔
278
                                vnetResourceIDs = []string{vnetResourceID}
1✔
279
                                if err := d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnet); err != nil {
2✔
280
                                        return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
281
                                }
1✔
282
                        }
283
                }
284
        }
285

286
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
10✔
287
                accountKind = string(storage.KindBlockBlobStorage)
1✔
288
        }
1✔
289
        if IsAzureStackCloud(d.cloud) {
10✔
290
                accountKind = string(storage.KindStorage)
1✔
291
                if storageAccountType != "" && storageAccountType != string(storage.SkuNameStandardLRS) && storageAccountType != string(storage.SkuNamePremiumLRS) {
2✔
292
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
293
                }
1✔
294
        }
295

296
        tags, err := util.ConvertTagsToMap(customTags)
8✔
297
        if err != nil {
9✔
298
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
299
        }
1✔
300

301
        if strings.TrimSpace(storageEndpointSuffix) == "" {
14✔
302
                storageEndpointSuffix = d.getStorageEndPointSuffix()
7✔
303
        }
7✔
304

305
        accountOptions := &azure.AccountOptions{
7✔
306
                Name:                            account,
7✔
307
                Type:                            storageAccountType,
7✔
308
                Kind:                            accountKind,
7✔
309
                SubscriptionID:                  subsID,
7✔
310
                ResourceGroup:                   resourceGroup,
7✔
311
                Location:                        location,
7✔
312
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
7✔
313
                VirtualNetworkResourceIDs:       vnetResourceIDs,
7✔
314
                Tags:                            tags,
7✔
315
                MatchTags:                       matchTags,
7✔
316
                IsHnsEnabled:                    isHnsEnabled,
7✔
317
                EnableNfsV3:                     enableNfsV3,
7✔
318
                AllowBlobPublicAccess:           allowBlobPublicAccess,
7✔
319
                RequireInfrastructureEncryption: requireInfraEncryption,
7✔
320
                VNetResourceGroup:               vnetResourceGroup,
7✔
321
                VNetName:                        vnetName,
7✔
322
                SubnetName:                      subnetName,
7✔
323
                AccessTier:                      accessTier,
7✔
324
                CreatePrivateEndpoint:           createPrivateEndpoint,
7✔
325
                StorageType:                     provider.StorageTypeBlob,
7✔
326
                StorageEndpointSuffix:           storageEndpointSuffix,
7✔
327
                EnableBlobVersioning:            enableBlobVersioning,
7✔
328
                SoftDeleteBlobs:                 softDeleteBlobs,
7✔
329
                SoftDeleteContainers:            softDeleteContainers,
7✔
330
                GetLatestAccountKey:             getLatestAccountKey,
7✔
331
        }
7✔
332

7✔
333
        var volumeID string
7✔
334
        requestName := "controller_create_volume"
7✔
335
        if req.GetVolumeContentSource() != nil {
9✔
336
                switch req.VolumeContentSource.Type.(type) {
2✔
337
                case *csi.VolumeContentSource_Snapshot:
1✔
338
                        requestName = "controller_create_volume_from_snapshot"
1✔
339
                case *csi.VolumeContentSource_Volume:
1✔
340
                        requestName = "controller_create_volume_from_volume"
1✔
341
                }
342
        }
343
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
7✔
344
        isOperationSucceeded := false
7✔
345
        defer func() {
14✔
346
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
7✔
347
        }()
7✔
348

349
        var accountKey string
7✔
350
        accountName := account
7✔
351
        secrets := req.GetSecrets()
7✔
352
        if len(secrets) == 0 && accountName == "" {
8✔
353
                if v, ok := d.volMap.Load(volName); ok {
1✔
354
                        accountName = v.(string)
×
355
                } else {
1✔
356
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, pointer.BoolDeref(createPrivateEndpoint, false))
1✔
357
                        // search in cache first
1✔
358
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
359
                        if err != nil {
1✔
360
                                return nil, status.Errorf(codes.Internal, err.Error())
×
361
                        }
×
362
                        if cache != nil {
1✔
363
                                accountName = cache.(string)
×
364
                        } else {
1✔
365
                                d.volLockMap.LockEntry(lockKey)
1✔
366
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
367
                                        var retErr error
1✔
368
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
369
                                        if isRetriableError(retErr) {
1✔
370
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
371
                                                return false, nil
×
372
                                        }
×
373
                                        return true, retErr
1✔
374
                                })
375
                                d.volLockMap.UnlockEntry(lockKey)
1✔
376
                                if err != nil {
2✔
377
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
378
                                }
1✔
379
                                d.accountSearchCache.Set(lockKey, accountName)
×
380
                                d.volMap.Store(volName, accountName)
×
381
                        }
382
                }
383
        }
384

385
        if pointer.BoolDeref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
6✔
386
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
387
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
388
                //
×
389
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
390
                // by private dns zone, which includes CNAME record, documented here:
×
391
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
392
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
393
        }
×
394

395
        accountOptions.Name = accountName
6✔
396
        if len(secrets) == 0 && useDataPlaneAPI {
7✔
397
                if accountKey == "" {
2✔
398
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
399
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
400
                        }
1✔
401
                }
402
                secrets = createStorageAccountSecret(accountName, accountKey)
×
403
        }
404

405
        // replace pv/pvc name namespace metadata in subDir
406
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
5✔
407
        validContainerName := containerName
5✔
408
        if validContainerName == "" {
5✔
409
                validContainerName = volName
×
410
                if containerNamePrefix != "" {
×
411
                        validContainerName = containerNamePrefix + "-" + volName
×
412
                }
×
413
                validContainerName = getValidContainerName(validContainerName, protocol)
×
414
                setKeyValueInMap(parameters, containerNameField, validContainerName)
×
415
        }
416

417
        if req.GetVolumeContentSource() != nil {
7✔
418
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
2✔
419
                if err != nil {
2✔
420
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
421
                }
×
422
                if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
4✔
423
                        return nil, err
2✔
424
                }
2✔
425
        } else {
3✔
426
                klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
3✔
427
                if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
4✔
428
                        return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
429
                }
1✔
430
        }
431

432
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
433
                if accountKey == "" {
4✔
434
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
435
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
436
                        }
1✔
437
                }
438

439
                secretName, err := setAzureCredentials(ctx, d.cloud.KubeClient, accountName, accountKey, secretNamespace)
1✔
440
                if err != nil {
1✔
441
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
442
                }
×
443
                if secretName != "" {
1✔
444
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
445
                }
×
446
        }
447

448
        var uuid string
1✔
449
        if containerName != "" {
2✔
450
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
451
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
452
                uuid = volName
1✔
453
        }
1✔
454
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
455
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
456

1✔
457
        if useDataPlaneAPI {
1✔
458
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
459
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
460
        }
×
461

462
        isOperationSucceeded = true
1✔
463
        // reset secretNamespace field in VolumeContext
1✔
464
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
465
        return &csi.CreateVolumeResponse{
1✔
466
                Volume: &csi.Volume{
1✔
467
                        VolumeId:      volumeID,
1✔
468
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
469
                        VolumeContext: parameters,
1✔
470
                        ContentSource: req.GetVolumeContentSource(),
1✔
471
                },
1✔
472
        }, nil
1✔
473
}
474

475
// DeleteVolume delete a volume
476
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
477
        volumeID := req.GetVolumeId()
5✔
478
        if len(volumeID) == 0 {
6✔
479
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
480
        }
1✔
481

482
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
483
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
484
        }
×
485

486
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
487
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
488
        }
×
489
        defer d.volumeLocks.Release(volumeID)
4✔
490

4✔
491
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
492
        if err != nil {
5✔
493
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
494
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
495
                return &csi.DeleteVolumeResponse{}, nil
1✔
496
        }
1✔
497

498
        secrets := req.GetSecrets()
3✔
499
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
500
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
501
                if err != nil {
2✔
502
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
503
                }
1✔
504
                if accountName != "" && accountKey != "" {
×
505
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
506
                }
×
507
        }
508

509
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
510
        isOperationSucceeded := false
2✔
511
        defer func() {
4✔
512
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
513
        }()
2✔
514

515
        if resourceGroupName == "" {
3✔
516
                resourceGroupName = d.cloud.ResourceGroup
1✔
517
        }
1✔
518
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
519
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
520
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
521
        }
2✔
522

523
        isOperationSucceeded = true
×
524
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
525
        return &csi.DeleteVolumeResponse{}, nil
×
526
}
527

528
// ValidateVolumeCapabilities return the capabilities of the volume
529
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
530
        volumeID := req.GetVolumeId()
8✔
531
        if len(volumeID) == 0 {
9✔
532
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
533
        }
1✔
534
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
535
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
536
        }
2✔
537

538
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
539
        if err != nil {
6✔
540
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
541
                return nil, status.Error(codes.NotFound, err.Error())
1✔
542
        }
1✔
543

544
        var exist bool
4✔
545
        secrets := req.GetSecrets()
4✔
546
        if len(secrets) > 0 {
5✔
547
                container, err := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
548
                if err != nil {
2✔
549
                        return nil, status.Error(codes.Internal, err.Error())
1✔
550
                }
1✔
551
                exist, err = container.Exists()
×
552
                if err != nil {
×
553
                        return nil, status.Error(codes.Internal, err.Error())
×
554
                }
×
555
        } else {
3✔
556
                if resourceGroupName == "" {
3✔
557
                        resourceGroupName = d.cloud.ResourceGroup
×
558
                }
×
559
                blobContainer, retryErr := d.cloud.BlobClient.GetContainer(ctx, subsID, resourceGroupName, accountName, containerName)
3✔
560
                err = retryErr.Error()
3✔
561
                if err != nil {
4✔
562
                        return nil, status.Error(codes.Internal, err.Error())
1✔
563
                }
1✔
564
                if blobContainer.ContainerProperties == nil {
3✔
565
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
566
                }
1✔
567
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
568
        }
569
        if !exist {
2✔
570
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
571
        }
1✔
572
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
573

×
574
        // blob driver supports all AccessModes, no need to check capabilities here
×
575
        return &csi.ValidateVolumeCapabilitiesResponse{
×
576
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
577
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
578
                },
×
579
                Message: "",
×
580
        }, nil
×
581
}
582

583
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
584
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
585
}
1✔
586

587
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
588
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
589
}
1✔
590

591
// ControllerGetVolume get volume
592
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
593
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
594
}
1✔
595

596
// GetCapacity returns the capacity of the total available storage pool
597
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
598
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
599
}
1✔
600

601
// ListVolumes return all available volumes
602
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
603
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
604
}
1✔
605

606
// CreateSnapshot create snapshot
607
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
608
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
609
}
1✔
610

611
// DeleteSnapshot delete snapshot
612
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
613
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
614
}
1✔
615

616
// ListSnapshots list snapshots
617
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
618
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
619
}
1✔
620

621
// ControllerGetCapabilities returns the capabilities of the Controller plugin
622
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
623
        return &csi.ControllerGetCapabilitiesResponse{
1✔
624
                Capabilities: d.Cap,
1✔
625
        }, nil
1✔
626
}
1✔
627

628
// ControllerExpandVolume controller expand volume
629
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
630
        if len(req.GetVolumeId()) == 0 {
5✔
631
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
632
        }
1✔
633

634
        if req.GetCapacityRange() == nil {
4✔
635
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
636
        }
1✔
637

638
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
639
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
640
        }
×
641

642
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
643
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
644

2✔
645
        if volSizeBytes > containerMaxSize {
3✔
646
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
647
        }
1✔
648

649
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
650

1✔
651
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
652
}
653

654
// CreateBlobContainer creates a blob container
655
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
9✔
656
        if containerName == "" {
10✔
657
                return fmt.Errorf("containerName is empty")
1✔
658
        }
1✔
659
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
16✔
660
                var err error
8✔
661
                if len(secrets) > 0 {
9✔
662
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
663
                        if getErr != nil {
2✔
664
                                return true, getErr
1✔
665
                        }
1✔
666
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
667
                } else {
7✔
668
                        blobContainer := storage.BlobContainer{
7✔
669
                                ContainerProperties: &storage.ContainerProperties{
7✔
670
                                        PublicAccess: storage.PublicAccessNone,
7✔
671
                                },
7✔
672
                        }
7✔
673
                        err = d.cloud.BlobClient.CreateContainer(ctx, subsID, resourceGroupName, accountName, containerName, blobContainer).Error()
7✔
674
                }
7✔
675
                if err != nil {
11✔
676
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
677
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
7✔
678
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
3✔
679
                                return false, nil
3✔
680
                        }
3✔
681
                }
682
                return true, err
4✔
683
        })
684
}
685

686
// DeleteBlobContainer deletes a blob container
687
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
688
        if containerName == "" {
9✔
689
                return fmt.Errorf("containerName is empty")
1✔
690
        }
1✔
691
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
692
                var err error
7✔
693
                if len(secrets) > 0 {
10✔
694
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
3✔
695
                        if getErr != nil {
6✔
696
                                return true, getErr
3✔
697
                        }
3✔
698
                        _, err = container.DeleteIfExists(nil)
×
699
                } else {
4✔
700
                        err = d.cloud.BlobClient.DeleteContainer(ctx, subsID, resourceGroupName, accountName, containerName).Error()
4✔
701
                }
4✔
702
                if err != nil {
7✔
703
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
704
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
705
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
706
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
707
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
708
                                return true, nil
2✔
709
                        }
2✔
710
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
711
                }
712
                return true, err
1✔
713
        })
714
}
715

716
// CopyBlobContainer copies a blob container in the same storage account
717
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
6✔
718
        var sourceVolumeID string
6✔
719
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
12✔
720
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
6✔
721

6✔
722
        }
6✔
723
        resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
6✔
724
        if err != nil {
8✔
725
                return status.Error(codes.NotFound, err.Error())
2✔
726
        }
2✔
727
        if srcContainerName == "" || dstContainerName == "" {
6✔
728
                return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
2✔
729
        }
2✔
730

731
        timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute)
2✔
732
        timeTick := time.Tick(waitForAzCopyInterval)
2✔
733
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
2✔
734
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
2✔
735

2✔
736
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
737
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
738
        if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
3✔
739
                return err
1✔
740
        }
1✔
741
        klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
1✔
742
        for {
2✔
743
                select {
1✔
744
                case <-timeTick:
1✔
745
                        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
1✔
746
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
1✔
747
                        switch jobState {
1✔
748
                        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
749
                                return err
1✔
750
                        case util.AzcopyJobNotFound:
×
751
                                klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
×
752
                                cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
×
753
                                if len(authAzcopyEnv) > 0 {
×
754
                                        cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
755
                                }
×
756
                                out, copyErr := cmd.CombinedOutput()
×
757
                                if copyErr != nil {
×
758
                                        klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
×
759
                                } else {
×
760
                                        klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
761
                                }
×
762
                                return copyErr
×
763
                        }
764
                case <-timeAfter:
×
765
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName)
×
766
                }
767
        }
768
}
769

770
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
771
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
8✔
772
        vs := req.VolumeContentSource
8✔
773
        switch vs.Type.(type) {
8✔
774
        case *csi.VolumeContentSource_Snapshot:
2✔
775
                return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
2✔
776
        case *csi.VolumeContentSource_Volume:
6✔
777
                return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
6✔
778
        default:
×
779
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
780
        }
781
}
782

783
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
784
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
7✔
785
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
7✔
786
        var authAzcopyEnv []string
7✔
787
        if azureAuthConfig.UseManagedIdentityExtension {
11✔
788
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
4✔
789
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
5✔
790
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
791
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
792
                } else {
4✔
793
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
3✔
794
                }
3✔
795
                return authAzcopyEnv, nil
4✔
796
        }
797
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
798
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
799
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
800
                if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
3✔
801
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
802
                }
1✔
803
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
804
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
805
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
1✔
806
                klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
1✔
807

1✔
808
                return authAzcopyEnv, nil
1✔
809
        }
810
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
811
}
812

813
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
814
// 1. secrets is not empty
815
// 2. driver is not using managed identity and service principal
816
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
817
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, []string, error) {
6✔
818
        var authAzcopyEnv []string
6✔
819
        var err error
6✔
820
        useSasToken := false
6✔
821
        if !d.useDataPlaneAPI("", accountName) && len(secrets) == 0 && len(secretName) == 0 {
8✔
822
                // search in cache first
2✔
823
                if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
2✔
824
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
825
                        return cache.(string), nil, nil
×
826
                }
×
827

828
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
2✔
829
                if err != nil {
2✔
830
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
831
                } else {
2✔
832
                        if len(authAzcopyEnv) > 0 {
4✔
833
                                out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
2✔
834
                                if testErr != nil {
2✔
835
                                        return "", nil, fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
×
836
                                }
×
837
                                if strings.Contains(out, authorizationPermissionMismatch) {
2✔
838
                                        klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
×
839
                                        useSasToken = true
×
840
                                }
×
841
                        }
842
                }
843
        }
844

845
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
10✔
846
                if accountKey == "" {
8✔
847
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
5✔
848
                                return "", nil, err
1✔
849
                        }
1✔
850
                }
851
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
852
                sasToken, err := d.generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
853
                return sasToken, nil, err
3✔
854
        }
855
        return "", authAzcopyEnv, nil
2✔
856
}
857

858
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
859
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
27✔
860
        if len(volCaps) == 0 {
29✔
861
                return fmt.Errorf("volume capabilities missing in request")
2✔
862
        }
2✔
863
        for _, c := range volCaps {
50✔
864
                if c.GetBlock() != nil {
27✔
865
                        return fmt.Errorf("block volume capability not supported")
2✔
866
                }
2✔
867
        }
868
        return nil
23✔
869
}
870

871
func parseDays(dayStr string) (int32, error) {
3✔
872
        days, err := strconv.Atoi(dayStr)
3✔
873
        if err != nil {
4✔
874
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class", softDeleteBlobsField, dayStr))
1✔
875
        }
1✔
876
        if days <= 0 || days > 365 {
3✔
877
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr))
1✔
878
        }
1✔
879

880
        return int32(days), nil
1✔
881
}
882

883
// generateSASToken generate a sas token for storage account
884
func (d *Driver) generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
885
        // search in cache first
5✔
886
        cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
5✔
887
        if err != nil {
5✔
888
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
889
        }
×
890
        if cache != nil {
5✔
891
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
892
                return cache.(string), nil
×
893
        }
×
894

895
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
896
        if err != nil {
8✔
897
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
3✔
898
        }
3✔
899
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, nil)
2✔
900
        if err != nil {
2✔
901
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
×
902
        }
×
903
        sasURL, err := serviceClient.GetSASURL(
2✔
904
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
905
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
906
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
907
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
908
        )
2✔
909
        if err != nil {
2✔
910
                return "", err
×
911
        }
×
912
        u, err := url.Parse(sasURL)
2✔
913
        if err != nil {
2✔
914
                return "", err
×
915
        }
×
916
        sasToken := "?" + u.RawQuery
2✔
917
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
918
        return sasToken, nil
2✔
919
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc