• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 9289598898

29 May 2024 04:17PM UTC coverage: 74.251%. Remained the same
9289598898

Pull #1406

github

web-flow
chore(deps): bump sigs.k8s.io/cloud-provider-azure/pkg/azclient

Bumps [sigs.k8s.io/cloud-provider-azure/pkg/azclient](https://github.com/kubernetes-sigs/cloud-provider-azure) from 0.0.21 to 0.0.23.
- [Release notes](https://github.com/kubernetes-sigs/cloud-provider-azure/releases)
- [Changelog](https://github.com/kubernetes-sigs/cloud-provider-azure/blob/master/docs/release-versioning.md)
- [Commits](https://github.com/kubernetes-sigs/cloud-provider-azure/compare/pkg/azclient/v0.0.21...pkg/azclient/v0.0.23)

---
updated-dependencies:
- dependency-name: sigs.k8s.io/cloud-provider-azure/pkg/azclient
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1406: chore(deps): bump sigs.k8s.io/cloud-provider-azure/pkg/azclient from 0.0.21 to 0.0.23

2206 of 2971 relevant lines covered (74.25%)

7.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.37
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
38
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
39
        "github.com/container-storage-interface/spec/lib/go/csi"
40

41
        "k8s.io/apimachinery/pkg/util/wait"
42
        "k8s.io/klog/v2"
43
        "k8s.io/utils/pointer"
44

45
        "sigs.k8s.io/blob-csi-driver/pkg/util"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
47
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
49
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
50
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
51
)
52

53
const (
54
        privateEndpoint = "privateendpoint"
55

56
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
57
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
58
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
59
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
60
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
61
        MSI                             = "MSI"
62
        SPN                             = "SPN"
63
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
64
)
65

66
// CreateVolume provisions a volume
67
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
22✔
68
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
22✔
69
                klog.Errorf("invalid create volume req: %v", req)
×
70
                return nil, err
×
71
        }
×
72

73
        volName := req.GetName()
22✔
74
        if len(volName) == 0 {
23✔
75
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
76
        }
1✔
77

78
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
23✔
79
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
80
        }
2✔
81

82
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
19✔
83
                // logging the job status if it's volume cloning
×
84
                if req.GetVolumeContentSource() != nil {
×
85
                        jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
×
86
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
87
                }
×
88
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
89
        }
90
        defer d.volumeLocks.Release(volName)
19✔
91

19✔
92
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
19✔
93
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
19✔
94

19✔
95
        parameters := req.GetParameters()
19✔
96
        if parameters == nil {
20✔
97
                parameters = make(map[string]string)
1✔
98
        }
1✔
99
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace string
19✔
100
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3 *bool
19✔
101
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy string
19✔
102
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
19✔
103
        var softDeleteBlobs, softDeleteContainers int32
19✔
104
        var vnetResourceIDs []string
19✔
105
        var err error
19✔
106
        // set allowBlobPublicAccess as false by default
19✔
107
        allowBlobPublicAccess := pointer.Bool(false)
19✔
108

19✔
109
        containerNameReplaceMap := map[string]string{}
19✔
110

19✔
111
        // store account key to k8s secret by default
19✔
112
        storeAccountKey := true
19✔
113

19✔
114
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
19✔
115
        // the values to the cloud provider.
19✔
116
        for k, v := range parameters {
114✔
117
                switch strings.ToLower(k) {
95✔
118
                case skuNameField:
9✔
119
                        storageAccountType = v
9✔
120
                case storageAccountTypeField:
10✔
121
                        storageAccountType = v
10✔
122
                case locationField:
9✔
123
                        location = v
9✔
124
                case storageAccountField:
10✔
125
                        account = v
10✔
126
                case subscriptionIDField:
×
127
                        subsID = v
×
128
                case resourceGroupField:
9✔
129
                        resourceGroup = v
9✔
130
                case containerNameField:
10✔
131
                        containerName = v
10✔
132
                case containerNamePrefixField:
2✔
133
                        containerNamePrefix = v
2✔
134
                case protocolField:
9✔
135
                        protocol = v
9✔
136
                case tagsField:
1✔
137
                        customTags = v
1✔
138
                case matchTagsField:
1✔
139
                        matchTags = strings.EqualFold(v, trueValue)
1✔
140
                case secretNameField:
×
141
                        secretName = v
×
142
                case secretNamespaceField:
×
143
                        secretNamespace = v
×
144
                case isHnsEnabledField:
×
145
                        if strings.EqualFold(v, trueValue) {
×
146
                                isHnsEnabled = pointer.Bool(true)
×
147
                        }
×
148
                case softDeleteBlobsField:
×
149
                        days, err := parseDays(v)
×
150
                        if err != nil {
×
151
                                return nil, err
×
152
                        }
×
153
                        softDeleteBlobs = days
×
154
                case softDeleteContainersField:
×
155
                        days, err := parseDays(v)
×
156
                        if err != nil {
×
157
                                return nil, err
×
158
                        }
×
159
                        softDeleteContainers = days
×
160
                case enableBlobVersioningField:
×
161
                        enableBlobVersioning = pointer.Bool(strings.EqualFold(v, trueValue))
×
162
                case storeAccountKeyField:
3✔
163
                        if strings.EqualFold(v, falseValue) {
5✔
164
                                storeAccountKey = false
2✔
165
                        }
2✔
166
                case getLatestAccountKeyField:
1✔
167
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
168
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
169
                        }
1✔
170
                case allowBlobPublicAccessField:
×
171
                        if strings.EqualFold(v, trueValue) {
×
172
                                allowBlobPublicAccess = pointer.Bool(true)
×
173
                        }
×
174
                case requireInfraEncryptionField:
×
175
                        if strings.EqualFold(v, trueValue) {
×
176
                                requireInfraEncryption = pointer.Bool(true)
×
177
                        }
×
178
                case pvcNamespaceKey:
×
179
                        pvcNamespace = v
×
180
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
181
                case pvcNameKey:
×
182
                        containerNameReplaceMap[pvcNameMetadata] = v
×
183
                case pvNameKey:
×
184
                        containerNameReplaceMap[pvNameMetadata] = v
×
185
                case serverNameField:
×
186
                case storageAuthTypeField:
1✔
187
                case storageIdentityClientIDField:
1✔
188
                case storageIdentityObjectIDField:
1✔
189
                case storageIdentityResourceIDField:
1✔
190
                case msiEndpointField:
1✔
191
                case storageAADEndpointField:
1✔
192
                        // no op, only used in NodeStageVolume
193
                case storageEndpointSuffixField:
×
194
                        storageEndpointSuffix = v
×
195
                case vnetResourceGroupField:
×
196
                        vnetResourceGroup = v
×
197
                case vnetNameField:
×
198
                        vnetName = v
×
199
                case subnetNameField:
1✔
200
                        subnetName = v
1✔
201
                case accessTierField:
×
202
                        accessTier = v
×
203
                case networkEndpointTypeField:
1✔
204
                        networkEndpointType = v
1✔
205
                case mountPermissionsField:
10✔
206
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
10✔
207
                        if v != "" {
20✔
208
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
11✔
209
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
210
                                }
1✔
211
                        }
212
                case useDataPlaneAPIField:
1✔
213
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
214
                case fsGroupChangePolicyField:
1✔
215
                        fsGroupChangePolicy = v
1✔
216
                default:
1✔
217
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
218
                }
219
        }
220

221
        if pointer.BoolDeref(enableBlobVersioning, false) {
16✔
222
                if isNFSProtocol(protocol) || pointer.BoolDeref(isHnsEnabled, false) {
×
223
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
224
                }
×
225
        }
226

227
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
17✔
228
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
229
        }
1✔
230

231
        if matchTags && account != "" {
16✔
232
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
233
        }
1✔
234

235
        if resourceGroup == "" {
20✔
236
                resourceGroup = d.cloud.ResourceGroup
6✔
237
        }
6✔
238

239
        if secretNamespace == "" {
28✔
240
                if pvcNamespace == "" {
28✔
241
                        secretNamespace = defaultNamespace
14✔
242
                } else {
14✔
243
                        secretNamespace = pvcNamespace
×
244
                }
×
245
        }
246

247
        if protocol == "" {
19✔
248
                protocol = Fuse
5✔
249
        }
5✔
250
        if !isSupportedProtocol(protocol) {
15✔
251
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
252
        }
1✔
253
        if !isSupportedAccessTier(accessTier) {
13✔
254
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
255
        }
×
256

257
        if containerName != "" && containerNamePrefix != "" {
14✔
258
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
259
        }
1✔
260
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
13✔
261
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
262
        }
1✔
263

264
        enableHTTPSTrafficOnly := true
11✔
265
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
12✔
266
                if strings.Contains(subnetName, ",") {
2✔
267
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
268
                }
1✔
269
                createPrivateEndpoint = pointer.BoolPtr(true)
×
270
        }
271
        accountKind := string(armstorage.KindStorageV2)
10✔
272
        if isNFSProtocol(protocol) {
11✔
273
                isHnsEnabled = pointer.Bool(true)
1✔
274
                enableNfsV3 = pointer.Bool(true)
1✔
275
                // NFS protocol does not need account key
1✔
276
                storeAccountKey = false
1✔
277
                if !pointer.BoolDeref(createPrivateEndpoint, false) {
2✔
278
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
279
                        subnets := strings.Split(subnetName, ",")
1✔
280
                        for _, subnet := range subnets {
2✔
281
                                subnet = strings.TrimSpace(subnet)
1✔
282
                                vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, subnet)
1✔
283
                                klog.V(2).Infof("set vnetResourceID(%s) for NFS protocol", vnetResourceID)
1✔
284
                                vnetResourceIDs = []string{vnetResourceID}
1✔
285
                                if err := d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnet); err != nil {
2✔
286
                                        return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
287
                                }
1✔
288
                        }
289
                }
290
        }
291

292
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
10✔
293
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
294
        }
1✔
295
        if IsAzureStackCloud(d.cloud) {
10✔
296
                accountKind = string(armstorage.KindStorage)
1✔
297
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
298
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
299
                }
1✔
300
        }
301

302
        tags, err := util.ConvertTagsToMap(customTags)
8✔
303
        if err != nil {
9✔
304
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
305
        }
1✔
306

307
        if strings.TrimSpace(storageEndpointSuffix) == "" {
14✔
308
                storageEndpointSuffix = d.getStorageEndPointSuffix()
7✔
309
        }
7✔
310

311
        accountOptions := &azure.AccountOptions{
7✔
312
                Name:                            account,
7✔
313
                Type:                            storageAccountType,
7✔
314
                Kind:                            accountKind,
7✔
315
                SubscriptionID:                  subsID,
7✔
316
                ResourceGroup:                   resourceGroup,
7✔
317
                Location:                        location,
7✔
318
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
7✔
319
                VirtualNetworkResourceIDs:       vnetResourceIDs,
7✔
320
                Tags:                            tags,
7✔
321
                MatchTags:                       matchTags,
7✔
322
                IsHnsEnabled:                    isHnsEnabled,
7✔
323
                EnableNfsV3:                     enableNfsV3,
7✔
324
                AllowBlobPublicAccess:           allowBlobPublicAccess,
7✔
325
                RequireInfrastructureEncryption: requireInfraEncryption,
7✔
326
                VNetResourceGroup:               vnetResourceGroup,
7✔
327
                VNetName:                        vnetName,
7✔
328
                SubnetName:                      subnetName,
7✔
329
                AccessTier:                      accessTier,
7✔
330
                CreatePrivateEndpoint:           createPrivateEndpoint,
7✔
331
                StorageType:                     provider.StorageTypeBlob,
7✔
332
                StorageEndpointSuffix:           storageEndpointSuffix,
7✔
333
                EnableBlobVersioning:            enableBlobVersioning,
7✔
334
                SoftDeleteBlobs:                 softDeleteBlobs,
7✔
335
                SoftDeleteContainers:            softDeleteContainers,
7✔
336
                GetLatestAccountKey:             getLatestAccountKey,
7✔
337
        }
7✔
338

7✔
339
        var volumeID string
7✔
340
        requestName := "controller_create_volume"
7✔
341
        if req.GetVolumeContentSource() != nil {
9✔
342
                switch req.VolumeContentSource.Type.(type) {
2✔
343
                case *csi.VolumeContentSource_Snapshot:
1✔
344
                        requestName = "controller_create_volume_from_snapshot"
1✔
345
                case *csi.VolumeContentSource_Volume:
1✔
346
                        requestName = "controller_create_volume_from_volume"
1✔
347
                }
348
        }
349
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
7✔
350
        isOperationSucceeded := false
7✔
351
        defer func() {
14✔
352
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
7✔
353
        }()
7✔
354

355
        var accountKey string
7✔
356
        accountName := account
7✔
357
        secrets := req.GetSecrets()
7✔
358
        if len(secrets) == 0 && accountName == "" {
8✔
359
                if v, ok := d.volMap.Load(volName); ok {
1✔
360
                        accountName = v.(string)
×
361
                } else {
1✔
362
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, pointer.BoolDeref(createPrivateEndpoint, false))
1✔
363
                        // search in cache first
1✔
364
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
365
                        if err != nil {
1✔
366
                                return nil, status.Errorf(codes.Internal, err.Error())
×
367
                        }
×
368
                        if cache != nil {
1✔
369
                                accountName = cache.(string)
×
370
                        } else {
1✔
371
                                d.volLockMap.LockEntry(lockKey)
1✔
372
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
373
                                        var retErr error
1✔
374
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
375
                                        if isRetriableError(retErr) {
1✔
376
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
377
                                                return false, nil
×
378
                                        }
×
379
                                        return true, retErr
1✔
380
                                })
381
                                d.volLockMap.UnlockEntry(lockKey)
1✔
382
                                if err != nil {
2✔
383
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
384
                                }
1✔
385
                                d.accountSearchCache.Set(lockKey, accountName)
×
386
                                d.volMap.Store(volName, accountName)
×
387
                        }
388
                }
389
        }
390

391
        if pointer.BoolDeref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
6✔
392
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
393
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
394
                //
×
395
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
396
                // by private dns zone, which includes CNAME record, documented here:
×
397
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
398
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
399
        }
×
400

401
        accountOptions.Name = accountName
6✔
402
        if len(secrets) == 0 && useDataPlaneAPI {
7✔
403
                if accountKey == "" {
2✔
404
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
405
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
406
                        }
1✔
407
                }
408
                secrets = createStorageAccountSecret(accountName, accountKey)
×
409
        }
410

411
        // replace pv/pvc name namespace metadata in subDir
412
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
5✔
413
        validContainerName := containerName
5✔
414
        if validContainerName == "" {
5✔
415
                validContainerName = volName
×
416
                if containerNamePrefix != "" {
×
417
                        validContainerName = containerNamePrefix + "-" + volName
×
418
                }
×
419
                validContainerName = getValidContainerName(validContainerName, protocol)
×
420
                setKeyValueInMap(parameters, containerNameField, validContainerName)
×
421
        }
422

423
        if req.GetVolumeContentSource() != nil {
7✔
424
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
2✔
425
                if err != nil {
2✔
426
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
427
                }
×
428
                if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
4✔
429
                        return nil, err
2✔
430
                }
2✔
431
        } else {
3✔
432
                klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
3✔
433
                if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
4✔
434
                        return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
435
                }
1✔
436
        }
437

438
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
439
                if accountKey == "" {
4✔
440
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
441
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
442
                        }
1✔
443
                }
444

445
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
446
                if err != nil {
1✔
447
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
448
                }
×
449
                if secretName != "" {
1✔
450
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
451
                }
×
452
        }
453

454
        var uuid string
1✔
455
        if containerName != "" {
2✔
456
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
457
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
458
                uuid = volName
1✔
459
        }
1✔
460
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
461
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
462

1✔
463
        if useDataPlaneAPI {
1✔
464
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
465
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
466
        }
×
467

468
        isOperationSucceeded = true
1✔
469
        // reset secretNamespace field in VolumeContext
1✔
470
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
471
        return &csi.CreateVolumeResponse{
1✔
472
                Volume: &csi.Volume{
1✔
473
                        VolumeId:      volumeID,
1✔
474
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
475
                        VolumeContext: parameters,
1✔
476
                        ContentSource: req.GetVolumeContentSource(),
1✔
477
                },
1✔
478
        }, nil
1✔
479
}
480

481
// DeleteVolume delete a volume
482
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
483
        volumeID := req.GetVolumeId()
5✔
484
        if len(volumeID) == 0 {
6✔
485
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
486
        }
1✔
487

488
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
489
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
490
        }
×
491

492
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
493
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
494
        }
×
495
        defer d.volumeLocks.Release(volumeID)
4✔
496

4✔
497
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
498
        if err != nil {
5✔
499
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
500
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
501
                return &csi.DeleteVolumeResponse{}, nil
1✔
502
        }
1✔
503

504
        secrets := req.GetSecrets()
3✔
505
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
506
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
507
                if err != nil {
2✔
508
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
509
                }
1✔
510
                if accountName != "" && accountKey != "" {
×
511
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
512
                }
×
513
        }
514

515
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
516
        isOperationSucceeded := false
2✔
517
        defer func() {
4✔
518
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
519
        }()
2✔
520

521
        if resourceGroupName == "" {
3✔
522
                resourceGroupName = d.cloud.ResourceGroup
1✔
523
        }
1✔
524
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
525
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
526
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
527
        }
2✔
528

529
        isOperationSucceeded = true
×
530
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
531
        return &csi.DeleteVolumeResponse{}, nil
×
532
}
533

534
// ValidateVolumeCapabilities return the capabilities of the volume
535
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
536
        volumeID := req.GetVolumeId()
8✔
537
        if len(volumeID) == 0 {
9✔
538
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
539
        }
1✔
540
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
541
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
542
        }
2✔
543

544
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
545
        if err != nil {
6✔
546
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
547
                return nil, status.Error(codes.NotFound, err.Error())
1✔
548
        }
1✔
549

550
        var exist bool
4✔
551
        secrets := req.GetSecrets()
4✔
552
        if len(secrets) > 0 {
5✔
553
                container, err := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
554
                if err != nil {
2✔
555
                        return nil, status.Error(codes.Internal, err.Error())
1✔
556
                }
1✔
557
                exist, err = container.Exists()
×
558
                if err != nil {
×
559
                        return nil, status.Error(codes.Internal, err.Error())
×
560
                }
×
561
        } else {
3✔
562
                if resourceGroupName == "" {
3✔
563
                        resourceGroupName = d.cloud.ResourceGroup
×
564
                }
×
565
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
566
                if err != nil {
3✔
567
                        return nil, status.Error(codes.Internal, err.Error())
×
568
                }
×
569

570
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
571
                if err != nil {
4✔
572
                        return nil, status.Error(codes.Internal, err.Error())
1✔
573
                }
1✔
574
                if blobContainer.ContainerProperties == nil {
3✔
575
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
576
                }
1✔
577
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
578
        }
579
        if !exist {
2✔
580
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
581
        }
1✔
582
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
583

×
584
        // blob driver supports all AccessModes, no need to check capabilities here
×
585
        return &csi.ValidateVolumeCapabilitiesResponse{
×
586
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
587
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
588
                },
×
589
                Message: "",
×
590
        }, nil
×
591
}
592

593
// ControllerModifyVolume modify volume
594
func (d *Driver) ControllerModifyVolume(_ context.Context, _ *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
×
595
        return nil, status.Error(codes.Unimplemented, "")
×
596
}
×
597

598
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
599
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
600
}
1✔
601

602
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
603
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
604
}
1✔
605

606
// ControllerGetVolume get volume
607
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
608
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
609
}
1✔
610

611
// GetCapacity returns the capacity of the total available storage pool
612
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
613
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
614
}
1✔
615

616
// ListVolumes return all available volumes
617
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
618
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
619
}
1✔
620

621
// CreateSnapshot create snapshot
622
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
623
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
624
}
1✔
625

626
// DeleteSnapshot delete snapshot
627
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
628
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
629
}
1✔
630

631
// ListSnapshots list snapshots
632
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
633
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
634
}
1✔
635

636
// ControllerGetCapabilities returns the capabilities of the Controller plugin
637
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
638
        return &csi.ControllerGetCapabilitiesResponse{
1✔
639
                Capabilities: d.Cap,
1✔
640
        }, nil
1✔
641
}
1✔
642

643
// ControllerExpandVolume controller expand volume
644
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
645
        if len(req.GetVolumeId()) == 0 {
5✔
646
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
647
        }
1✔
648

649
        if req.GetCapacityRange() == nil {
4✔
650
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
651
        }
1✔
652

653
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
654
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
655
        }
×
656

657
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
658
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
659

2✔
660
        if volSizeBytes > containerMaxSize {
3✔
661
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
662
        }
1✔
663

664
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
665

1✔
666
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
667
}
668

669
// CreateBlobContainer creates a blob container
670
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
9✔
671
        if containerName == "" {
10✔
672
                return fmt.Errorf("containerName is empty")
1✔
673
        }
1✔
674
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
16✔
675
                var err error
8✔
676
                if len(secrets) > 0 {
9✔
677
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
678
                        if getErr != nil {
2✔
679
                                return true, getErr
1✔
680
                        }
1✔
681
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
682
                } else {
7✔
683
                        blobContainer := armstorage.BlobContainer{
7✔
684
                                ContainerProperties: &armstorage.ContainerProperties{
7✔
685
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
7✔
686
                                },
7✔
687
                        }
7✔
688
                        var blobClient blobcontainerclient.Interface
7✔
689
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
7✔
690
                        if err != nil {
7✔
691
                                return true, err
×
692
                        }
×
693
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
7✔
694
                }
695
                if err != nil {
11✔
696
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
697
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
698
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
699
                                return false, nil
2✔
700
                        }
2✔
701
                }
702
                return true, err
5✔
703
        })
704
}
705

706
// DeleteBlobContainer deletes a blob container
707
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
708
        if containerName == "" {
9✔
709
                return fmt.Errorf("containerName is empty")
1✔
710
        }
1✔
711
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
712
                var err error
7✔
713
                if len(secrets) > 0 {
10✔
714
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
3✔
715
                        if getErr != nil {
6✔
716
                                return true, getErr
3✔
717
                        }
3✔
718
                        _, err = container.DeleteIfExists(nil)
×
719
                } else {
4✔
720
                        var blobClient blobcontainerclient.Interface
4✔
721
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
722
                        if err != nil {
4✔
723
                                return true, err
×
724
                        }
×
725
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
726
                }
727
                if err != nil {
7✔
728
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
729
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
730
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
731
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
732
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
733
                                return true, nil
2✔
734
                        }
2✔
735
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
736
                }
737
                return true, err
1✔
738
        })
739
}
740

741
// CopyBlobContainer copies a blob container in the same storage account
742
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
6✔
743
        var sourceVolumeID string
6✔
744
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
12✔
745
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
6✔
746

6✔
747
        }
6✔
748
        resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
6✔
749
        if err != nil {
8✔
750
                return status.Error(codes.NotFound, err.Error())
2✔
751
        }
2✔
752
        if srcContainerName == "" || dstContainerName == "" {
6✔
753
                return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
2✔
754
        }
2✔
755

756
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
2✔
757
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
2✔
758

2✔
759
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
760
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
761
        switch jobState {
2✔
762
        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
763
                return err
1✔
764
        case util.AzcopyJobRunning:
1✔
765
                return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
1✔
766
        case util.AzcopyJobNotFound:
×
767
                klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
×
768
                execFunc := func() error {
×
769
                        cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
×
770
                        if len(authAzcopyEnv) > 0 {
×
771
                                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
772
                        }
×
773
                        if out, err := cmd.CombinedOutput(); err != nil {
×
774
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
775
                        }
×
776
                        return nil
×
777
                }
778
                timeoutFunc := func() error {
×
779
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
780
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
781
                }
×
782
                copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
783
                if copyErr != nil {
×
784
                        klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
×
785
                } else {
×
786
                        klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
787
                }
×
788
                return copyErr
×
789
        }
790
        return err
×
791
}
792

793
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
794
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
8✔
795
        vs := req.VolumeContentSource
8✔
796
        switch vs.Type.(type) {
8✔
797
        case *csi.VolumeContentSource_Snapshot:
2✔
798
                return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
2✔
799
        case *csi.VolumeContentSource_Volume:
6✔
800
                return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
6✔
801
        default:
×
802
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
803
        }
804
}
805

806
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
807
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
7✔
808
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
7✔
809
        var authAzcopyEnv []string
7✔
810
        if azureAuthConfig.UseManagedIdentityExtension {
11✔
811
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
4✔
812
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
5✔
813
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
814
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
815
                } else {
4✔
816
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
3✔
817
                }
3✔
818
                return authAzcopyEnv, nil
4✔
819
        }
820
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
821
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
822
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
823
                if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
3✔
824
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
825
                }
1✔
826
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
827
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
828
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
1✔
829
                klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
1✔
830

1✔
831
                return authAzcopyEnv, nil
1✔
832
        }
833
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
834
}
835

836
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
837
// 1. secrets is not empty
838
// 2. driver is not using managed identity and service principal
839
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
840
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, []string, error) {
6✔
841
        var authAzcopyEnv []string
6✔
842
        var err error
6✔
843
        useSasToken := false
6✔
844
        if !d.useDataPlaneAPI("", accountName) && len(secrets) == 0 && len(secretName) == 0 {
8✔
845
                // search in cache first
2✔
846
                if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
2✔
847
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
848
                        return cache.(string), nil, nil
×
849
                }
×
850

851
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
2✔
852
                if err != nil {
2✔
853
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
854
                } else {
2✔
855
                        if len(authAzcopyEnv) > 0 {
4✔
856
                                out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
2✔
857
                                if testErr != nil {
2✔
858
                                        return "", nil, fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
×
859
                                }
×
860
                                if strings.Contains(out, authorizationPermissionMismatch) {
2✔
861
                                        klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
×
862
                                        useSasToken = true
×
863
                                }
×
864
                        }
865
                }
866
        }
867

868
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
10✔
869
                if accountKey == "" {
8✔
870
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
5✔
871
                                return "", nil, err
1✔
872
                        }
1✔
873
                }
874
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
875
                sasToken, err := d.generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
876
                return sasToken, nil, err
3✔
877
        }
878
        return "", authAzcopyEnv, nil
2✔
879
}
880

881
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
882
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
28✔
883
        if len(volCaps) == 0 {
30✔
884
                return fmt.Errorf("volume capabilities missing in request")
2✔
885
        }
2✔
886
        for _, c := range volCaps {
52✔
887
                if c.GetBlock() != nil {
28✔
888
                        return fmt.Errorf("block volume capability not supported")
2✔
889
                }
2✔
890
        }
891
        return nil
24✔
892
}
893

894
func parseDays(dayStr string) (int32, error) {
3✔
895
        days, err := strconv.Atoi(dayStr)
3✔
896
        if err != nil {
4✔
897
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class", softDeleteBlobsField, dayStr))
1✔
898
        }
1✔
899
        if days <= 0 || days > 365 {
3✔
900
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr))
1✔
901
        }
1✔
902

903
        return int32(days), nil
1✔
904
}
905

906
// generateSASToken generate a sas token for storage account
907
func (d *Driver) generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
908
        // search in cache first
5✔
909
        cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
5✔
910
        if err != nil {
5✔
911
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
912
        }
×
913
        if cache != nil {
5✔
914
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
915
                return cache.(string), nil
×
916
        }
×
917

918
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
919
        if err != nil {
8✔
920
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
3✔
921
        }
3✔
922
        clientOptions := service.ClientOptions{}
2✔
923
        clientOptions.InsecureAllowCredentialWithHTTP = true
2✔
924
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, &clientOptions)
2✔
925
        if err != nil {
2✔
926
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
×
927
        }
×
928
        sasURL, err := serviceClient.GetSASURL(
2✔
929
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
930
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
931
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
932
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
933
        )
2✔
934
        if err != nil {
2✔
935
                return "", err
×
936
        }
×
937
        u, err := url.Parse(sasURL)
2✔
938
        if err != nil {
2✔
939
                return "", err
×
940
        }
×
941
        sasToken := "?" + u.RawQuery
2✔
942
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
943
        return sasToken, nil
2✔
944
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc