• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 8603508478

08 Apr 2024 04:26PM UTC coverage: 74.428%. Remained the same
8603508478

Pull #1339

github

web-flow
chore(deps): bump golang.org/x/net from 0.22.0 to 0.24.0

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.22.0 to 0.24.0.
- [Commits](https://github.com/golang/net/compare/v0.22.0...v0.24.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1339: chore(deps): bump golang.org/x/net from 0.22.0 to 0.24.0

2212 of 2972 relevant lines covered (74.43%)

7.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.12
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
36
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
37
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
38
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
39
        "github.com/container-storage-interface/spec/lib/go/csi"
40

41
        "k8s.io/apimachinery/pkg/util/wait"
42
        "k8s.io/klog/v2"
43
        "k8s.io/utils/pointer"
44

45
        "sigs.k8s.io/blob-csi-driver/pkg/util"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/blobcontainerclient"
47
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
48
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
49
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
50
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
51
)
52

53
const (
54
        privateEndpoint = "privateendpoint"
55

56
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
57
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
58
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
59
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
60
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
61
        MSI                             = "MSI"
62
        SPN                             = "SPN"
63
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
64
)
65

66
// CreateVolume provisions a volume
67
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
24✔
68
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
24✔
69
                klog.Errorf("invalid create volume req: %v", req)
×
70
                return nil, err
×
71
        }
×
72

73
        volName := req.GetName()
24✔
74
        if len(volName) == 0 {
25✔
75
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
76
        }
1✔
77

78
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
25✔
79
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
80
        }
2✔
81

82
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
21✔
83
                // logging the job status if it's volume cloning
×
84
                if req.GetVolumeContentSource() != nil {
×
85
                        jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
×
86
                        return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err)
×
87
                }
×
88
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
89
        }
90
        defer d.volumeLocks.Release(volName)
21✔
91

21✔
92
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
21✔
93
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
21✔
94

21✔
95
        parameters := req.GetParameters()
21✔
96
        if parameters == nil {
22✔
97
                parameters = make(map[string]string)
1✔
98
        }
1✔
99
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace string
21✔
100
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3 *bool
21✔
101
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy string
21✔
102
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
21✔
103
        var softDeleteBlobs, softDeleteContainers int32
21✔
104
        var vnetResourceIDs []string
21✔
105
        var err error
21✔
106
        // set allowBlobPublicAccess as false by default
21✔
107
        allowBlobPublicAccess := pointer.Bool(false)
21✔
108

21✔
109
        containerNameReplaceMap := map[string]string{}
21✔
110

21✔
111
        // store account key to k8s secret by default
21✔
112
        storeAccountKey := true
21✔
113

21✔
114
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
21✔
115
        // the values to the cloud provider.
21✔
116
        for k, v := range parameters {
131✔
117
                switch strings.ToLower(k) {
110✔
118
                case skuNameField:
10✔
119
                        storageAccountType = v
10✔
120
                case storageAccountTypeField:
11✔
121
                        storageAccountType = v
11✔
122
                case locationField:
10✔
123
                        location = v
10✔
124
                case storageAccountField:
11✔
125
                        account = v
11✔
126
                case subscriptionIDField:
2✔
127
                        subsID = v
2✔
128
                case resourceGroupField:
11✔
129
                        resourceGroup = v
11✔
130
                case containerNameField:
12✔
131
                        containerName = v
12✔
132
                case containerNamePrefixField:
2✔
133
                        containerNamePrefix = v
2✔
134
                case protocolField:
11✔
135
                        protocol = v
11✔
136
                case tagsField:
1✔
137
                        customTags = v
1✔
138
                case matchTagsField:
1✔
139
                        matchTags = strings.EqualFold(v, trueValue)
1✔
140
                case secretNameField:
×
141
                        secretName = v
×
142
                case secretNamespaceField:
×
143
                        secretNamespace = v
×
144
                case isHnsEnabledField:
×
145
                        if strings.EqualFold(v, trueValue) {
×
146
                                isHnsEnabled = pointer.Bool(true)
×
147
                        }
×
148
                case softDeleteBlobsField:
×
149
                        days, err := parseDays(v)
×
150
                        if err != nil {
×
151
                                return nil, err
×
152
                        }
×
153
                        softDeleteBlobs = days
×
154
                case softDeleteContainersField:
×
155
                        days, err := parseDays(v)
×
156
                        if err != nil {
×
157
                                return nil, err
×
158
                        }
×
159
                        softDeleteContainers = days
×
160
                case enableBlobVersioningField:
×
161
                        enableBlobVersioning = pointer.Bool(strings.EqualFold(v, trueValue))
×
162
                case storeAccountKeyField:
4✔
163
                        if strings.EqualFold(v, falseValue) {
7✔
164
                                storeAccountKey = false
3✔
165
                        }
3✔
166
                case getLatestAccountKeyField:
1✔
167
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
168
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
169
                        }
1✔
170
                case allowBlobPublicAccessField:
×
171
                        if strings.EqualFold(v, trueValue) {
×
172
                                allowBlobPublicAccess = pointer.Bool(true)
×
173
                        }
×
174
                case requireInfraEncryptionField:
×
175
                        if strings.EqualFold(v, trueValue) {
×
176
                                requireInfraEncryption = pointer.Bool(true)
×
177
                        }
×
178
                case pvcNamespaceKey:
×
179
                        pvcNamespace = v
×
180
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
181
                case pvcNameKey:
×
182
                        containerNameReplaceMap[pvcNameMetadata] = v
×
183
                case pvNameKey:
×
184
                        containerNameReplaceMap[pvNameMetadata] = v
×
185
                case serverNameField:
×
186
                case storageAuthTypeField:
1✔
187
                case storageIdentityClientIDField:
1✔
188
                case storageIdentityObjectIDField:
1✔
189
                case storageIdentityResourceIDField:
1✔
190
                case msiEndpointField:
1✔
191
                case storageAADEndpointField:
1✔
192
                        // no op, only used in NodeStageVolume
193
                case storageEndpointSuffixField:
×
194
                        storageEndpointSuffix = v
×
195
                case vnetResourceGroupField:
×
196
                        vnetResourceGroup = v
×
197
                case vnetNameField:
×
198
                        vnetName = v
×
199
                case subnetNameField:
1✔
200
                        subnetName = v
1✔
201
                case accessTierField:
×
202
                        accessTier = v
×
203
                case networkEndpointTypeField:
1✔
204
                        networkEndpointType = v
1✔
205
                case mountPermissionsField:
12✔
206
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
12✔
207
                        if v != "" {
24✔
208
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
13✔
209
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
210
                                }
1✔
211
                        }
212
                case useDataPlaneAPIField:
1✔
213
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
214
                case fsGroupChangePolicyField:
1✔
215
                        fsGroupChangePolicy = v
1✔
216
                default:
1✔
217
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
218
                }
219
        }
220

221
        if pointer.BoolDeref(enableBlobVersioning, false) {
18✔
222
                if isNFSProtocol(protocol) || pointer.BoolDeref(isHnsEnabled, false) {
×
223
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
224
                }
×
225
        }
226

227
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
19✔
228
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
229
        }
1✔
230

231
        if matchTags && account != "" {
18✔
232
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
233
        }
1✔
234

235
        if subsID != "" && subsID != d.cloud.SubscriptionID {
18✔
236
                if isNFSProtocol(protocol) {
3✔
237
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("NFS protocol is not supported in cross subscription(%s)", subsID))
1✔
238
                }
1✔
239
                if !storeAccountKey {
2✔
240
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("storeAccountKey must set as true in cross subscription(%s)", subsID))
1✔
241
                }
1✔
242
        }
243

244
        if resourceGroup == "" {
20✔
245
                resourceGroup = d.cloud.ResourceGroup
6✔
246
        }
6✔
247

248
        if secretNamespace == "" {
28✔
249
                if pvcNamespace == "" {
28✔
250
                        secretNamespace = defaultNamespace
14✔
251
                } else {
14✔
252
                        secretNamespace = pvcNamespace
×
253
                }
×
254
        }
255

256
        if protocol == "" {
19✔
257
                protocol = Fuse
5✔
258
        }
5✔
259
        if !isSupportedProtocol(protocol) {
15✔
260
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
261
        }
1✔
262
        if !isSupportedAccessTier(accessTier) {
13✔
263
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, armstorage.PossibleAccessTierValues())
×
264
        }
×
265

266
        if containerName != "" && containerNamePrefix != "" {
14✔
267
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
268
        }
1✔
269
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
13✔
270
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
271
        }
1✔
272

273
        enableHTTPSTrafficOnly := true
11✔
274
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
12✔
275
                if strings.Contains(subnetName, ",") {
2✔
276
                        return nil, status.Errorf(codes.InvalidArgument, "subnetName(%s) can only contain one subnet for private endpoint", subnetName)
1✔
277
                }
1✔
278
                createPrivateEndpoint = pointer.BoolPtr(true)
×
279
        }
280
        accountKind := string(armstorage.KindStorageV2)
10✔
281
        if isNFSProtocol(protocol) {
11✔
282
                isHnsEnabled = pointer.Bool(true)
1✔
283
                enableNfsV3 = pointer.Bool(true)
1✔
284
                // NFS protocol does not need account key
1✔
285
                storeAccountKey = false
1✔
286
                if !pointer.BoolDeref(createPrivateEndpoint, false) {
2✔
287
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
288
                        subnets := strings.Split(subnetName, ",")
1✔
289
                        for _, subnet := range subnets {
2✔
290
                                subnet = strings.TrimSpace(subnet)
1✔
291
                                vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, subnet)
1✔
292
                                klog.V(2).Infof("set vnetResourceID(%s) for NFS protocol", vnetResourceID)
1✔
293
                                vnetResourceIDs = []string{vnetResourceID}
1✔
294
                                if err := d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnet); err != nil {
2✔
295
                                        return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
296
                                }
1✔
297
                        }
298
                }
299
        }
300

301
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
10✔
302
                accountKind = string(armstorage.KindBlockBlobStorage)
1✔
303
        }
1✔
304
        if IsAzureStackCloud(d.cloud) {
10✔
305
                accountKind = string(armstorage.KindStorage)
1✔
306
                if storageAccountType != "" && storageAccountType != string(armstorage.SKUNameStandardLRS) && storageAccountType != string(armstorage.SKUNamePremiumLRS) {
2✔
307
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
308
                }
1✔
309
        }
310

311
        tags, err := util.ConvertTagsToMap(customTags)
8✔
312
        if err != nil {
9✔
313
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
314
        }
1✔
315

316
        if strings.TrimSpace(storageEndpointSuffix) == "" {
14✔
317
                storageEndpointSuffix = d.getStorageEndPointSuffix()
7✔
318
        }
7✔
319

320
        accountOptions := &azure.AccountOptions{
7✔
321
                Name:                            account,
7✔
322
                Type:                            storageAccountType,
7✔
323
                Kind:                            accountKind,
7✔
324
                SubscriptionID:                  subsID,
7✔
325
                ResourceGroup:                   resourceGroup,
7✔
326
                Location:                        location,
7✔
327
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
7✔
328
                VirtualNetworkResourceIDs:       vnetResourceIDs,
7✔
329
                Tags:                            tags,
7✔
330
                MatchTags:                       matchTags,
7✔
331
                IsHnsEnabled:                    isHnsEnabled,
7✔
332
                EnableNfsV3:                     enableNfsV3,
7✔
333
                AllowBlobPublicAccess:           allowBlobPublicAccess,
7✔
334
                RequireInfrastructureEncryption: requireInfraEncryption,
7✔
335
                VNetResourceGroup:               vnetResourceGroup,
7✔
336
                VNetName:                        vnetName,
7✔
337
                SubnetName:                      subnetName,
7✔
338
                AccessTier:                      accessTier,
7✔
339
                CreatePrivateEndpoint:           createPrivateEndpoint,
7✔
340
                StorageType:                     provider.StorageTypeBlob,
7✔
341
                StorageEndpointSuffix:           storageEndpointSuffix,
7✔
342
                EnableBlobVersioning:            enableBlobVersioning,
7✔
343
                SoftDeleteBlobs:                 softDeleteBlobs,
7✔
344
                SoftDeleteContainers:            softDeleteContainers,
7✔
345
                GetLatestAccountKey:             getLatestAccountKey,
7✔
346
        }
7✔
347

7✔
348
        var volumeID string
7✔
349
        requestName := "controller_create_volume"
7✔
350
        if req.GetVolumeContentSource() != nil {
9✔
351
                switch req.VolumeContentSource.Type.(type) {
2✔
352
                case *csi.VolumeContentSource_Snapshot:
1✔
353
                        requestName = "controller_create_volume_from_snapshot"
1✔
354
                case *csi.VolumeContentSource_Volume:
1✔
355
                        requestName = "controller_create_volume_from_volume"
1✔
356
                }
357
        }
358
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
7✔
359
        isOperationSucceeded := false
7✔
360
        defer func() {
14✔
361
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
7✔
362
        }()
7✔
363

364
        var accountKey string
7✔
365
        accountName := account
7✔
366
        secrets := req.GetSecrets()
7✔
367
        if len(secrets) == 0 && accountName == "" {
8✔
368
                if v, ok := d.volMap.Load(volName); ok {
1✔
369
                        accountName = v.(string)
×
370
                } else {
1✔
371
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, pointer.BoolDeref(createPrivateEndpoint, false))
1✔
372
                        // search in cache first
1✔
373
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
374
                        if err != nil {
1✔
375
                                return nil, status.Errorf(codes.Internal, err.Error())
×
376
                        }
×
377
                        if cache != nil {
1✔
378
                                accountName = cache.(string)
×
379
                        } else {
1✔
380
                                d.volLockMap.LockEntry(lockKey)
1✔
381
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
382
                                        var retErr error
1✔
383
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
384
                                        if isRetriableError(retErr) {
1✔
385
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
386
                                                return false, nil
×
387
                                        }
×
388
                                        return true, retErr
1✔
389
                                })
390
                                d.volLockMap.UnlockEntry(lockKey)
1✔
391
                                if err != nil {
2✔
392
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
393
                                }
1✔
394
                                d.accountSearchCache.Set(lockKey, accountName)
×
395
                                d.volMap.Store(volName, accountName)
×
396
                        }
397
                }
398
        }
399

400
        if pointer.BoolDeref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
6✔
401
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
402
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
403
                //
×
404
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
405
                // by private dns zone, which includes CNAME record, documented here:
×
406
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
407
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
408
        }
×
409

410
        accountOptions.Name = accountName
6✔
411
        if len(secrets) == 0 && useDataPlaneAPI {
7✔
412
                if accountKey == "" {
2✔
413
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
414
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
415
                        }
1✔
416
                }
417
                secrets = createStorageAccountSecret(accountName, accountKey)
×
418
        }
419

420
        // replace pv/pvc name namespace metadata in subDir
421
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
5✔
422
        validContainerName := containerName
5✔
423
        if validContainerName == "" {
5✔
424
                validContainerName = volName
×
425
                if containerNamePrefix != "" {
×
426
                        validContainerName = containerNamePrefix + "-" + volName
×
427
                }
×
428
                validContainerName = getValidContainerName(validContainerName, protocol)
×
429
                setKeyValueInMap(parameters, containerNameField, validContainerName)
×
430
        }
431

432
        if req.GetVolumeContentSource() != nil {
7✔
433
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
2✔
434
                if err != nil {
2✔
435
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
436
                }
×
437
                if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
4✔
438
                        return nil, err
2✔
439
                }
2✔
440
        } else {
3✔
441
                klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
3✔
442
                if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
4✔
443
                        return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
444
                }
1✔
445
        }
446

447
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
448
                if accountKey == "" {
4✔
449
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
450
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
451
                        }
1✔
452
                }
453

454
                secretName, err := setAzureCredentials(ctx, d.KubeClient, accountName, accountKey, secretNamespace)
1✔
455
                if err != nil {
1✔
456
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
457
                }
×
458
                if secretName != "" {
1✔
459
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
460
                }
×
461
        }
462

463
        var uuid string
1✔
464
        if containerName != "" {
2✔
465
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
466
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
467
                uuid = volName
1✔
468
        }
1✔
469
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
470
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
471

1✔
472
        if useDataPlaneAPI {
1✔
473
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
474
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
475
        }
×
476

477
        isOperationSucceeded = true
1✔
478
        // reset secretNamespace field in VolumeContext
1✔
479
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
480
        return &csi.CreateVolumeResponse{
1✔
481
                Volume: &csi.Volume{
1✔
482
                        VolumeId:      volumeID,
1✔
483
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
484
                        VolumeContext: parameters,
1✔
485
                        ContentSource: req.GetVolumeContentSource(),
1✔
486
                },
1✔
487
        }, nil
1✔
488
}
489

490
// DeleteVolume delete a volume
491
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
492
        volumeID := req.GetVolumeId()
5✔
493
        if len(volumeID) == 0 {
6✔
494
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
495
        }
1✔
496

497
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
498
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
499
        }
×
500

501
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
502
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
503
        }
×
504
        defer d.volumeLocks.Release(volumeID)
4✔
505

4✔
506
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
507
        if err != nil {
5✔
508
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
509
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
510
                return &csi.DeleteVolumeResponse{}, nil
1✔
511
        }
1✔
512

513
        secrets := req.GetSecrets()
3✔
514
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
515
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
516
                if err != nil {
2✔
517
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
518
                }
1✔
519
                if accountName != "" && accountKey != "" {
×
520
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
521
                }
×
522
        }
523

524
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
525
        isOperationSucceeded := false
2✔
526
        defer func() {
4✔
527
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
528
        }()
2✔
529

530
        if resourceGroupName == "" {
3✔
531
                resourceGroupName = d.cloud.ResourceGroup
1✔
532
        }
1✔
533
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
534
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
535
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
536
        }
2✔
537

538
        isOperationSucceeded = true
×
539
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
540
        return &csi.DeleteVolumeResponse{}, nil
×
541
}
542

543
// ValidateVolumeCapabilities return the capabilities of the volume
544
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
545
        volumeID := req.GetVolumeId()
8✔
546
        if len(volumeID) == 0 {
9✔
547
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
548
        }
1✔
549
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
550
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
551
        }
2✔
552

553
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
554
        if err != nil {
6✔
555
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
556
                return nil, status.Error(codes.NotFound, err.Error())
1✔
557
        }
1✔
558

559
        var exist bool
4✔
560
        secrets := req.GetSecrets()
4✔
561
        if len(secrets) > 0 {
5✔
562
                container, err := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
563
                if err != nil {
2✔
564
                        return nil, status.Error(codes.Internal, err.Error())
1✔
565
                }
1✔
566
                exist, err = container.Exists()
×
567
                if err != nil {
×
568
                        return nil, status.Error(codes.Internal, err.Error())
×
569
                }
×
570
        } else {
3✔
571
                if resourceGroupName == "" {
3✔
572
                        resourceGroupName = d.cloud.ResourceGroup
×
573
                }
×
574
                blobClient, err := d.clientFactory.GetBlobContainerClientForSub(subsID)
3✔
575
                if err != nil {
3✔
576
                        return nil, status.Error(codes.Internal, err.Error())
×
577
                }
×
578

579
                blobContainer, err := blobClient.Get(ctx, resourceGroupName, accountName, containerName)
3✔
580
                if err != nil {
4✔
581
                        return nil, status.Error(codes.Internal, err.Error())
1✔
582
                }
1✔
583
                if blobContainer.ContainerProperties == nil {
3✔
584
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
585
                }
1✔
586
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
587
        }
588
        if !exist {
2✔
589
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
590
        }
1✔
591
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
592

×
593
        // blob driver supports all AccessModes, no need to check capabilities here
×
594
        return &csi.ValidateVolumeCapabilitiesResponse{
×
595
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
596
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
597
                },
×
598
                Message: "",
×
599
        }, nil
×
600
}
601

602
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
603
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
604
}
1✔
605

606
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
607
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
608
}
1✔
609

610
// ControllerGetVolume get volume
611
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
612
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
613
}
1✔
614

615
// GetCapacity returns the capacity of the total available storage pool
616
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
617
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
618
}
1✔
619

620
// ListVolumes return all available volumes
621
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
622
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
623
}
1✔
624

625
// CreateSnapshot create snapshot
626
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
627
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
628
}
1✔
629

630
// DeleteSnapshot delete snapshot
631
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
632
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
633
}
1✔
634

635
// ListSnapshots list snapshots
636
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
637
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
638
}
1✔
639

640
// ControllerGetCapabilities returns the capabilities of the Controller plugin
641
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
642
        return &csi.ControllerGetCapabilitiesResponse{
1✔
643
                Capabilities: d.Cap,
1✔
644
        }, nil
1✔
645
}
1✔
646

647
// ControllerExpandVolume controller expand volume
648
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
649
        if len(req.GetVolumeId()) == 0 {
5✔
650
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
651
        }
1✔
652

653
        if req.GetCapacityRange() == nil {
4✔
654
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
655
        }
1✔
656

657
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
658
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
659
        }
×
660

661
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
662
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
663

2✔
664
        if volSizeBytes > containerMaxSize {
3✔
665
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
666
        }
1✔
667

668
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
669

1✔
670
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
671
}
672

673
// CreateBlobContainer creates a blob container
674
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
9✔
675
        if containerName == "" {
10✔
676
                return fmt.Errorf("containerName is empty")
1✔
677
        }
1✔
678
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
16✔
679
                var err error
8✔
680
                if len(secrets) > 0 {
9✔
681
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
1✔
682
                        if getErr != nil {
2✔
683
                                return true, getErr
1✔
684
                        }
1✔
685
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
686
                } else {
7✔
687
                        blobContainer := armstorage.BlobContainer{
7✔
688
                                ContainerProperties: &armstorage.ContainerProperties{
7✔
689
                                        PublicAccess: to.Ptr(armstorage.PublicAccessNone),
7✔
690
                                },
7✔
691
                        }
7✔
692
                        var blobClient blobcontainerclient.Interface
7✔
693
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
7✔
694
                        if err != nil {
7✔
695
                                return true, err
×
696
                        }
×
697
                        _, err = blobClient.CreateContainer(ctx, resourceGroupName, accountName, containerName, blobContainer)
7✔
698
                }
699
                if err != nil {
11✔
700
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
701
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
6✔
702
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
2✔
703
                                return false, nil
2✔
704
                        }
2✔
705
                }
706
                return true, err
5✔
707
        })
708
}
709

710
// DeleteBlobContainer deletes a blob container
711
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
712
        if containerName == "" {
9✔
713
                return fmt.Errorf("containerName is empty")
1✔
714
        }
1✔
715
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
716
                var err error
7✔
717
                if len(secrets) > 0 {
10✔
718
                        container, getErr := getContainerReference(containerName, secrets, d.getCloudEnvironment())
3✔
719
                        if getErr != nil {
6✔
720
                                return true, getErr
3✔
721
                        }
3✔
722
                        _, err = container.DeleteIfExists(nil)
×
723
                } else {
4✔
724
                        var blobClient blobcontainerclient.Interface
4✔
725
                        blobClient, err = d.clientFactory.GetBlobContainerClientForSub(subsID)
4✔
726
                        if err != nil {
4✔
727
                                return true, err
×
728
                        }
×
729
                        err = blobClient.DeleteContainer(ctx, resourceGroupName, accountName, containerName)
4✔
730
                }
731
                if err != nil {
7✔
732
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
733
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
734
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
735
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
736
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
737
                                return true, nil
2✔
738
                        }
2✔
739
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
740
                }
741
                return true, err
1✔
742
        })
743
}
744

745
// CopyBlobContainer copies a blob container in the same storage account
746
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
6✔
747
        var sourceVolumeID string
6✔
748
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
12✔
749
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
6✔
750

6✔
751
        }
6✔
752
        resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
6✔
753
        if err != nil {
8✔
754
                return status.Error(codes.NotFound, err.Error())
2✔
755
        }
2✔
756
        if srcContainerName == "" || dstContainerName == "" {
6✔
757
                return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
2✔
758
        }
2✔
759

760
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
2✔
761
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
2✔
762

2✔
763
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
764
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
765
        switch jobState {
2✔
766
        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
767
                return err
1✔
768
        case util.AzcopyJobRunning:
1✔
769
                return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent)
1✔
770
        case util.AzcopyJobNotFound:
×
771
                klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
×
772
                execFunc := func() error {
×
773
                        cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
×
774
                        if len(authAzcopyEnv) > 0 {
×
775
                                cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
776
                        }
×
777
                        if out, err := cmd.CombinedOutput(); err != nil {
×
778
                                return fmt.Errorf("exec error: %v, output: %v", err, string(out))
×
779
                        }
×
780
                        return nil
×
781
                }
782
                timeoutFunc := func() error {
×
783
                        _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
×
784
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent)
×
785
                }
×
786
                copyErr := util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc)
×
787
                if copyErr != nil {
×
788
                        klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr)
×
789
                } else {
×
790
                        klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
791
                }
×
792
                return copyErr
×
793
        }
794
        return err
×
795
}
796

797
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
798
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
8✔
799
        vs := req.VolumeContentSource
8✔
800
        switch vs.Type.(type) {
8✔
801
        case *csi.VolumeContentSource_Snapshot:
2✔
802
                return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
2✔
803
        case *csi.VolumeContentSource_Volume:
6✔
804
                return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
6✔
805
        default:
×
806
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
807
        }
808
}
809

810
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
811
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
7✔
812
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
7✔
813
        var authAzcopyEnv []string
7✔
814
        if azureAuthConfig.UseManagedIdentityExtension {
11✔
815
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
4✔
816
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
5✔
817
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
818
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
819
                } else {
4✔
820
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
3✔
821
                }
3✔
822
                return authAzcopyEnv, nil
4✔
823
        }
824
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
825
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
826
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
827
                if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
3✔
828
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
829
                }
1✔
830
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
831
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
832
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
1✔
833
                klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
1✔
834

1✔
835
                return authAzcopyEnv, nil
1✔
836
        }
837
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
838
}
839

840
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
841
// 1. secrets is not empty
842
// 2. driver is not using managed identity and service principal
843
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
844
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, []string, error) {
6✔
845
        var authAzcopyEnv []string
6✔
846
        var err error
6✔
847
        useSasToken := false
6✔
848
        if !d.useDataPlaneAPI("", accountName) && len(secrets) == 0 && len(secretName) == 0 {
8✔
849
                // search in cache first
2✔
850
                if cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault); err == nil && cache != nil {
2✔
851
                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
852
                        return cache.(string), nil, nil
×
853
                }
×
854

855
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
2✔
856
                if err != nil {
2✔
857
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
858
                } else {
2✔
859
                        if len(authAzcopyEnv) > 0 {
4✔
860
                                out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
2✔
861
                                if testErr != nil {
2✔
862
                                        return "", nil, fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
×
863
                                }
×
864
                                if strings.Contains(out, authorizationPermissionMismatch) {
2✔
865
                                        klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
×
866
                                        useSasToken = true
×
867
                                }
×
868
                        }
869
                }
870
        }
871

872
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
10✔
873
                if accountKey == "" {
8✔
874
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
5✔
875
                                return "", nil, err
1✔
876
                        }
1✔
877
                }
878
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
879
                sasToken, err := d.generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
880
                return sasToken, nil, err
3✔
881
        }
882
        return "", authAzcopyEnv, nil
2✔
883
}
884

885
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
886
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
30✔
887
        if len(volCaps) == 0 {
32✔
888
                return fmt.Errorf("volume capabilities missing in request")
2✔
889
        }
2✔
890
        for _, c := range volCaps {
56✔
891
                if c.GetBlock() != nil {
30✔
892
                        return fmt.Errorf("block volume capability not supported")
2✔
893
                }
2✔
894
        }
895
        return nil
26✔
896
}
897

898
func parseDays(dayStr string) (int32, error) {
3✔
899
        days, err := strconv.Atoi(dayStr)
3✔
900
        if err != nil {
4✔
901
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class", softDeleteBlobsField, dayStr))
1✔
902
        }
1✔
903
        if days <= 0 || days > 365 {
3✔
904
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr))
1✔
905
        }
1✔
906

907
        return int32(days), nil
1✔
908
}
909

910
// generateSASToken generate a sas token for storage account
911
func (d *Driver) generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
912
        // search in cache first
5✔
913
        cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
5✔
914
        if err != nil {
5✔
915
                return "", fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
916
        }
×
917
        if cache != nil {
5✔
918
                klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
919
                return cache.(string), nil
×
920
        }
×
921

922
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
923
        if err != nil {
8✔
924
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
3✔
925
        }
3✔
926
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, nil)
2✔
927
        if err != nil {
2✔
928
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
×
929
        }
×
930
        sasURL, err := serviceClient.GetSASURL(
2✔
931
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
932
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
933
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
934
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
935
        )
2✔
936
        if err != nil {
2✔
937
                return "", err
×
938
        }
×
939
        u, err := url.Parse(sasURL)
2✔
940
        if err != nil {
2✔
941
                return "", err
×
942
        }
×
943
        sasToken := "?" + u.RawQuery
2✔
944
        d.azcopySasTokenCache.Set(accountName, sasToken)
2✔
945
        return sasToken, nil
2✔
946
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc