• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 7412225870

04 Jan 2024 04:35PM UTC coverage: 77.871%. Remained the same
7412225870

Pull #1209

github

web-flow
chore(deps): bump golang.org/x/sync from 0.5.0 to 0.6.0

Bumps [golang.org/x/sync](https://github.com/golang/sync) from 0.5.0 to 0.6.0.
- [Commits](https://github.com/golang/sync/compare/v0.5.0...v0.6.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sync
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1209: chore(deps): bump golang.org/x/sync from 0.5.0 to 0.6.0

2034 of 2612 relevant lines covered (77.87%)

7.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.08
/pkg/blob/controllerserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "context"
21
        "fmt"
22
        "net/url"
23
        "os"
24
        "os/exec"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "google.golang.org/grpc/codes"
30
        "google.golang.org/grpc/status"
31

32
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
33
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
34
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
35
        "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
36
        "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage"
37
        azstorage "github.com/Azure/azure-sdk-for-go/storage"
38
        "github.com/container-storage-interface/spec/lib/go/csi"
39

40
        "k8s.io/apimachinery/pkg/util/wait"
41
        "k8s.io/klog/v2"
42
        "k8s.io/utils/pointer"
43

44
        "sigs.k8s.io/blob-csi-driver/pkg/util"
45
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
46
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
47
        "sigs.k8s.io/cloud-provider-azure/pkg/provider"
48
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
49
)
50

51
const (
52
        privateEndpoint = "privateendpoint"
53

54
        azcopyAutoLoginType             = "AZCOPY_AUTO_LOGIN_TYPE"
55
        azcopySPAApplicationID          = "AZCOPY_SPA_APPLICATION_ID"
56
        azcopySPAClientSecret           = "AZCOPY_SPA_CLIENT_SECRET"
57
        azcopyTenantID                  = "AZCOPY_TENANT_ID"
58
        azcopyMSIClientID               = "AZCOPY_MSI_CLIENT_ID"
59
        MSI                             = "MSI"
60
        SPN                             = "SPN"
61
        authorizationPermissionMismatch = "AuthorizationPermissionMismatch"
62

63
        waitForCopyInterval = 5 * time.Second
64
        waitForCopyTimeout  = 3 * time.Minute
65
)
66

67
// CreateVolume provisions a volume
68
func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
23✔
69
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
23✔
70
                klog.Errorf("invalid create volume req: %v", req)
×
71
                return nil, err
×
72
        }
×
73

74
        volName := req.GetName()
23✔
75
        if len(volName) == 0 {
24✔
76
                return nil, status.Error(codes.InvalidArgument, "CreateVolume Name must be provided")
1✔
77
        }
1✔
78

79
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
24✔
80
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
81
        }
2✔
82

83
        if acquired := d.volumeLocks.TryAcquire(volName); !acquired {
20✔
84
                // logging the job status if it's volume cloning
×
85
                if req.GetVolumeContentSource() != nil {
×
86
                        jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{})
×
87
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
×
88
                }
×
89
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName)
×
90
        }
91
        defer d.volumeLocks.Release(volName)
20✔
92

20✔
93
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
20✔
94
        requestGiB := int(util.RoundUpGiB(volSizeBytes))
20✔
95

20✔
96
        parameters := req.GetParameters()
20✔
97
        if parameters == nil {
21✔
98
                parameters = make(map[string]string)
1✔
99
        }
1✔
100
        var storageAccountType, subsID, resourceGroup, location, account, containerName, containerNamePrefix, protocol, customTags, secretName, secretNamespace, pvcNamespace string
20✔
101
        var isHnsEnabled, requireInfraEncryption, enableBlobVersioning, createPrivateEndpoint, enableNfsV3 *bool
20✔
102
        var vnetResourceGroup, vnetName, subnetName, accessTier, networkEndpointType, storageEndpointSuffix, fsGroupChangePolicy string
20✔
103
        var matchTags, useDataPlaneAPI, getLatestAccountKey bool
20✔
104
        var softDeleteBlobs, softDeleteContainers int32
20✔
105
        var vnetResourceIDs []string
20✔
106
        var err error
20✔
107
        // set allowBlobPublicAccess as false by default
20✔
108
        allowBlobPublicAccess := pointer.Bool(false)
20✔
109

20✔
110
        containerNameReplaceMap := map[string]string{}
20✔
111

20✔
112
        // store account key to k8s secret by default
20✔
113
        storeAccountKey := true
20✔
114

20✔
115
        // Apply ProvisionerParameters (case-insensitive). We leave validation of
20✔
116
        // the values to the cloud provider.
20✔
117
        for k, v := range parameters {
128✔
118
                switch strings.ToLower(k) {
108✔
119
                case skuNameField:
10✔
120
                        storageAccountType = v
10✔
121
                case storageAccountTypeField:
11✔
122
                        storageAccountType = v
11✔
123
                case locationField:
10✔
124
                        location = v
10✔
125
                case storageAccountField:
11✔
126
                        account = v
11✔
127
                case subscriptionIDField:
2✔
128
                        subsID = v
2✔
129
                case resourceGroupField:
11✔
130
                        resourceGroup = v
11✔
131
                case containerNameField:
12✔
132
                        containerName = v
12✔
133
                case containerNamePrefixField:
2✔
134
                        containerNamePrefix = v
2✔
135
                case protocolField:
11✔
136
                        protocol = v
11✔
137
                case tagsField:
1✔
138
                        customTags = v
1✔
139
                case matchTagsField:
1✔
140
                        matchTags = strings.EqualFold(v, trueValue)
1✔
141
                case secretNameField:
×
142
                        secretName = v
×
143
                case secretNamespaceField:
×
144
                        secretNamespace = v
×
145
                case isHnsEnabledField:
×
146
                        if strings.EqualFold(v, trueValue) {
×
147
                                isHnsEnabled = pointer.Bool(true)
×
148
                        }
×
149
                case softDeleteBlobsField:
×
150
                        days, err := parseDays(v)
×
151
                        if err != nil {
×
152
                                return nil, err
×
153
                        }
×
154
                        softDeleteBlobs = days
×
155
                case softDeleteContainersField:
×
156
                        days, err := parseDays(v)
×
157
                        if err != nil {
×
158
                                return nil, err
×
159
                        }
×
160
                        softDeleteContainers = days
×
161
                case enableBlobVersioningField:
×
162
                        enableBlobVersioning = pointer.Bool(strings.EqualFold(v, trueValue))
×
163
                case storeAccountKeyField:
4✔
164
                        if strings.EqualFold(v, falseValue) {
7✔
165
                                storeAccountKey = false
3✔
166
                        }
3✔
167
                case getLatestAccountKeyField:
1✔
168
                        if getLatestAccountKey, err = strconv.ParseBool(v); err != nil {
2✔
169
                                return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in volume context", getLatestAccountKeyField, v)
1✔
170
                        }
1✔
171
                case allowBlobPublicAccessField:
×
172
                        if strings.EqualFold(v, trueValue) {
×
173
                                allowBlobPublicAccess = pointer.Bool(true)
×
174
                        }
×
175
                case requireInfraEncryptionField:
×
176
                        if strings.EqualFold(v, trueValue) {
×
177
                                requireInfraEncryption = pointer.Bool(true)
×
178
                        }
×
179
                case pvcNamespaceKey:
×
180
                        pvcNamespace = v
×
181
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
182
                case pvcNameKey:
×
183
                        containerNameReplaceMap[pvcNameMetadata] = v
×
184
                case pvNameKey:
×
185
                        containerNameReplaceMap[pvNameMetadata] = v
×
186
                case serverNameField:
×
187
                case storageAuthTypeField:
1✔
188
                case storageIdentityClientIDField:
1✔
189
                case storageIdentityObjectIDField:
1✔
190
                case storageIdentityResourceIDField:
1✔
191
                case msiEndpointField:
1✔
192
                case storageAADEndpointField:
1✔
193
                        // no op, only used in NodeStageVolume
194
                case storageEndpointSuffixField:
×
195
                        storageEndpointSuffix = v
×
196
                case vnetResourceGroupField:
×
197
                        vnetResourceGroup = v
×
198
                case vnetNameField:
×
199
                        vnetName = v
×
200
                case subnetNameField:
×
201
                        subnetName = v
×
202
                case accessTierField:
×
203
                        accessTier = v
×
204
                case networkEndpointTypeField:
×
205
                        networkEndpointType = v
×
206
                case mountPermissionsField:
12✔
207
                        // only do validations here, used in NodeStageVolume, NodePublishVolume
12✔
208
                        if v != "" {
24✔
209
                                if _, err := strconv.ParseUint(v, 8, 32); err != nil {
13✔
210
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s in storage class", v))
1✔
211
                                }
1✔
212
                        }
213
                case useDataPlaneAPIField:
1✔
214
                        useDataPlaneAPI = strings.EqualFold(v, trueValue)
1✔
215
                case fsGroupChangePolicyField:
1✔
216
                        fsGroupChangePolicy = v
1✔
217
                default:
1✔
218
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid parameter %q in storage class", k))
1✔
219
                }
220
        }
221

222
        if pointer.BoolDeref(enableBlobVersioning, false) {
17✔
223
                if isNFSProtocol(protocol) || pointer.BoolDeref(isHnsEnabled, false) {
×
224
                        return nil, status.Errorf(codes.InvalidArgument, "enableBlobVersioning is not supported for NFS protocol or HNS enabled account")
×
225
                }
×
226
        }
227

228
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
18✔
229
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
230
        }
1✔
231

232
        if matchTags && account != "" {
17✔
233
                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("matchTags must set as false when storageAccount(%s) is provided", account))
1✔
234
        }
1✔
235

236
        if subsID != "" && subsID != d.cloud.SubscriptionID {
17✔
237
                if isNFSProtocol(protocol) {
3✔
238
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("NFS protocol is not supported in cross subscription(%s)", subsID))
1✔
239
                }
1✔
240
                if !storeAccountKey {
2✔
241
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("storeAccountKey must set as true in cross subscription(%s)", subsID))
1✔
242
                }
1✔
243
        }
244

245
        if resourceGroup == "" {
18✔
246
                resourceGroup = d.cloud.ResourceGroup
5✔
247
        }
5✔
248

249
        if secretNamespace == "" {
26✔
250
                if pvcNamespace == "" {
26✔
251
                        secretNamespace = defaultNamespace
13✔
252
                } else {
13✔
253
                        secretNamespace = pvcNamespace
×
254
                }
×
255
        }
256

257
        if protocol == "" {
17✔
258
                protocol = Fuse
4✔
259
        }
4✔
260
        if !isSupportedProtocol(protocol) {
14✔
261
                return nil, status.Errorf(codes.InvalidArgument, "protocol(%s) is not supported, supported protocol list: %v", protocol, supportedProtocolList)
1✔
262
        }
1✔
263
        if !isSupportedAccessTier(accessTier) {
12✔
264
                return nil, status.Errorf(codes.InvalidArgument, "accessTier(%s) is not supported, supported AccessTier list: %v", accessTier, storage.PossibleAccessTierValues())
×
265
        }
×
266

267
        if containerName != "" && containerNamePrefix != "" {
13✔
268
                return nil, status.Errorf(codes.InvalidArgument, "containerName(%s) and containerNamePrefix(%s) could not be specified together", containerName, containerNamePrefix)
1✔
269
        }
1✔
270
        if !isSupportedContainerNamePrefix(containerNamePrefix) {
12✔
271
                return nil, status.Errorf(codes.InvalidArgument, "containerNamePrefix(%s) can only contain lowercase letters, numbers, hyphens, and length should be less than 21", containerNamePrefix)
1✔
272
        }
1✔
273

274
        enableHTTPSTrafficOnly := true
10✔
275
        if strings.EqualFold(networkEndpointType, privateEndpoint) {
10✔
276
                createPrivateEndpoint = pointer.BoolPtr(true)
×
277
        }
×
278
        accountKind := string(storage.KindStorageV2)
10✔
279
        if isNFSProtocol(protocol) {
11✔
280
                isHnsEnabled = pointer.Bool(true)
1✔
281
                enableNfsV3 = pointer.Bool(true)
1✔
282
                // NFS protocol does not need account key
1✔
283
                storeAccountKey = false
1✔
284
                if !pointer.BoolDeref(createPrivateEndpoint, false) {
2✔
285
                        // set VirtualNetworkResourceIDs for storage account firewall setting
1✔
286
                        vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, subnetName)
1✔
287
                        klog.V(2).Infof("set vnetResourceID(%s) for NFS protocol", vnetResourceID)
1✔
288
                        vnetResourceIDs = []string{vnetResourceID}
1✔
289
                        if err := d.updateSubnetServiceEndpoints(ctx, vnetResourceGroup, vnetName, subnetName); err != nil {
2✔
290
                                return nil, status.Errorf(codes.Internal, "update service endpoints failed with error: %v", err)
1✔
291
                        }
1✔
292
                }
293
        }
294

295
        if strings.HasPrefix(strings.ToLower(storageAccountType), "premium") {
10✔
296
                accountKind = string(storage.KindBlockBlobStorage)
1✔
297
        }
1✔
298
        if IsAzureStackCloud(d.cloud) {
10✔
299
                accountKind = string(storage.KindStorage)
1✔
300
                if storageAccountType != "" && storageAccountType != string(storage.SkuNameStandardLRS) && storageAccountType != string(storage.SkuNamePremiumLRS) {
2✔
301
                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("Invalid skuName value: %s, as Azure Stack only supports %s and %s Storage Account types.", storageAccountType, storage.SkuNamePremiumLRS, storage.SkuNameStandardLRS))
1✔
302
                }
1✔
303
        }
304

305
        tags, err := util.ConvertTagsToMap(customTags)
8✔
306
        if err != nil {
9✔
307
                return nil, status.Errorf(codes.InvalidArgument, err.Error())
1✔
308
        }
1✔
309

310
        if strings.TrimSpace(storageEndpointSuffix) == "" {
14✔
311
                if d.cloud.Environment.StorageEndpointSuffix != "" {
7✔
312
                        storageEndpointSuffix = d.cloud.Environment.StorageEndpointSuffix
×
313
                } else {
7✔
314
                        storageEndpointSuffix = defaultStorageEndPointSuffix
7✔
315
                }
7✔
316
        }
317

318
        accountOptions := &azure.AccountOptions{
7✔
319
                Name:                            account,
7✔
320
                Type:                            storageAccountType,
7✔
321
                Kind:                            accountKind,
7✔
322
                SubscriptionID:                  subsID,
7✔
323
                ResourceGroup:                   resourceGroup,
7✔
324
                Location:                        location,
7✔
325
                EnableHTTPSTrafficOnly:          enableHTTPSTrafficOnly,
7✔
326
                VirtualNetworkResourceIDs:       vnetResourceIDs,
7✔
327
                Tags:                            tags,
7✔
328
                MatchTags:                       matchTags,
7✔
329
                IsHnsEnabled:                    isHnsEnabled,
7✔
330
                EnableNfsV3:                     enableNfsV3,
7✔
331
                AllowBlobPublicAccess:           allowBlobPublicAccess,
7✔
332
                RequireInfrastructureEncryption: requireInfraEncryption,
7✔
333
                VNetResourceGroup:               vnetResourceGroup,
7✔
334
                VNetName:                        vnetName,
7✔
335
                SubnetName:                      subnetName,
7✔
336
                AccessTier:                      accessTier,
7✔
337
                CreatePrivateEndpoint:           createPrivateEndpoint,
7✔
338
                StorageType:                     provider.StorageTypeBlob,
7✔
339
                StorageEndpointSuffix:           storageEndpointSuffix,
7✔
340
                EnableBlobVersioning:            enableBlobVersioning,
7✔
341
                SoftDeleteBlobs:                 softDeleteBlobs,
7✔
342
                SoftDeleteContainers:            softDeleteContainers,
7✔
343
                GetLatestAccountKey:             getLatestAccountKey,
7✔
344
        }
7✔
345

7✔
346
        var volumeID string
7✔
347
        requestName := "controller_create_volume"
7✔
348
        if req.GetVolumeContentSource() != nil {
9✔
349
                switch req.VolumeContentSource.Type.(type) {
2✔
350
                case *csi.VolumeContentSource_Snapshot:
1✔
351
                        requestName = "controller_create_volume_from_snapshot"
1✔
352
                case *csi.VolumeContentSource_Volume:
1✔
353
                        requestName = "controller_create_volume_from_volume"
1✔
354
                }
355
        }
356
        mc := metrics.NewMetricContext(blobCSIDriverName, requestName, d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
7✔
357
        isOperationSucceeded := false
7✔
358
        defer func() {
14✔
359
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
7✔
360
        }()
7✔
361

362
        var accountKey string
7✔
363
        accountName := account
7✔
364
        secrets := req.GetSecrets()
7✔
365
        if len(secrets) == 0 && accountName == "" {
8✔
366
                if v, ok := d.volMap.Load(volName); ok {
1✔
367
                        accountName = v.(string)
×
368
                } else {
1✔
369
                        lockKey := fmt.Sprintf("%s%s%s%s%s%v", storageAccountType, accountKind, resourceGroup, location, protocol, pointer.BoolDeref(createPrivateEndpoint, false))
1✔
370
                        // search in cache first
1✔
371
                        cache, err := d.accountSearchCache.Get(lockKey, azcache.CacheReadTypeDefault)
1✔
372
                        if err != nil {
1✔
373
                                return nil, status.Errorf(codes.Internal, err.Error())
×
374
                        }
×
375
                        if cache != nil {
1✔
376
                                accountName = cache.(string)
×
377
                        } else {
1✔
378
                                d.volLockMap.LockEntry(lockKey)
1✔
379
                                err = wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
2✔
380
                                        var retErr error
1✔
381
                                        accountName, accountKey, retErr = d.cloud.EnsureStorageAccount(ctx, accountOptions, protocol)
1✔
382
                                        if isRetriableError(retErr) {
1✔
383
                                                klog.Warningf("EnsureStorageAccount(%s) failed with error(%v), waiting for retrying", account, retErr)
×
384
                                                return false, nil
×
385
                                        }
×
386
                                        return true, retErr
1✔
387
                                })
388
                                d.volLockMap.UnlockEntry(lockKey)
1✔
389
                                if err != nil {
2✔
390
                                        return nil, status.Errorf(codes.Internal, "ensure storage account failed with %v", err)
1✔
391
                                }
1✔
392
                                d.accountSearchCache.Set(lockKey, accountName)
×
393
                                d.volMap.Store(volName, accountName)
×
394
                        }
395
                }
396
        }
397

398
        if pointer.BoolDeref(createPrivateEndpoint, false) && isNFSProtocol(protocol) {
6✔
399
                // As for blobfuse/blobfuse2, serverName, i.e.,AZURE_STORAGE_BLOB_ENDPOINT env variable can't include
×
400
                // "privatelink", issue: https://github.com/Azure/azure-storage-fuse/issues/1014
×
401
                //
×
402
                // And use public endpoint will be befine to blobfuse/blobfuse2, because it will be resolved to private endpoint
×
403
                // by private dns zone, which includes CNAME record, documented here:
×
404
                // https://learn.microsoft.com/en-us/azure/storage/common/storage-private-endpoints?toc=%2Fazure%2Fstorage%2Fblobs%2Ftoc.json&bc=%2Fazure%2Fstorage%2Fblobs%2Fbreadcrumb%2Ftoc.json#dns-changes-for-private-endpoints
×
405
                setKeyValueInMap(parameters, serverNameField, fmt.Sprintf("%s.privatelink.blob.%s", accountName, storageEndpointSuffix))
×
406
        }
×
407

408
        accountOptions.Name = accountName
6✔
409
        if len(secrets) == 0 && useDataPlaneAPI {
7✔
410
                if accountKey == "" {
2✔
411
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
2✔
412
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
413
                        }
1✔
414
                }
415
                secrets = createStorageAccountSecret(accountName, accountKey)
×
416
        }
417

418
        // replace pv/pvc name namespace metadata in subDir
419
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
5✔
420
        validContainerName := containerName
5✔
421
        if validContainerName == "" {
5✔
422
                validContainerName = volName
×
423
                if containerNamePrefix != "" {
×
424
                        validContainerName = containerNamePrefix + "-" + volName
×
425
                }
×
426
                validContainerName = getValidContainerName(validContainerName, protocol)
×
427
                setKeyValueInMap(parameters, containerNameField, validContainerName)
×
428
        }
429

430
        if req.GetVolumeContentSource() != nil {
7✔
431
                accountSASToken, authAzcopyEnv, err := d.getAzcopyAuth(ctx, accountName, accountKey, storageEndpointSuffix, accountOptions, secrets, secretName, secretNamespace)
2✔
432
                if err != nil {
2✔
433
                        return nil, status.Errorf(codes.Internal, "failed to getAzcopyAuth on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
×
434
                }
×
435
                if err := d.copyVolume(req, accountSASToken, authAzcopyEnv, validContainerName, storageEndpointSuffix); err != nil {
4✔
436
                        return nil, err
2✔
437
                }
2✔
438
        } else {
3✔
439
                klog.V(2).Infof("begin to create container(%s) on account(%s) type(%s) subsID(%s) rg(%s) location(%s) size(%d)", validContainerName, accountName, storageAccountType, subsID, resourceGroup, location, requestGiB)
3✔
440
                if err := d.CreateBlobContainer(ctx, subsID, resourceGroup, accountName, validContainerName, secrets); err != nil {
4✔
441
                        return nil, status.Errorf(codes.Internal, "failed to create container(%s) on account(%s) type(%s) rg(%s) location(%s) size(%d), error: %v", validContainerName, accountName, storageAccountType, resourceGroup, location, requestGiB, err)
1✔
442
                }
1✔
443
        }
444

445
        if storeAccountKey && len(req.GetSecrets()) == 0 {
4✔
446
                if accountKey == "" {
4✔
447
                        if accountName, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
3✔
448
                                return nil, status.Errorf(codes.Internal, "failed to GetStorageAccesskey on account(%s) rg(%s), error: %v", accountOptions.Name, accountOptions.ResourceGroup, err)
1✔
449
                        }
1✔
450
                }
451

452
                secretName, err := setAzureCredentials(ctx, d.cloud.KubeClient, accountName, accountKey, secretNamespace)
1✔
453
                if err != nil {
1✔
454
                        return nil, status.Errorf(codes.Internal, "failed to store storage account key: %v", err)
×
455
                }
×
456
                if secretName != "" {
1✔
457
                        klog.V(2).Infof("store account key to k8s secret(%v) in %s namespace", secretName, secretNamespace)
×
458
                }
×
459
        }
460

461
        var uuid string
1✔
462
        if containerName != "" {
2✔
463
                // add volume name as suffix to differentiate volumeID since "containerName" is specified
1✔
464
                // not necessary for dynamic container name creation since volumeID already contains volume name
1✔
465
                uuid = volName
1✔
466
        }
1✔
467
        volumeID = fmt.Sprintf(volumeIDTemplate, resourceGroup, accountName, validContainerName, uuid, secretNamespace, subsID)
1✔
468
        klog.V(2).Infof("create container %s on storage account %s successfully", validContainerName, accountName)
1✔
469

1✔
470
        if useDataPlaneAPI {
1✔
471
                d.dataPlaneAPIVolCache.Set(volumeID, "")
×
472
                d.dataPlaneAPIVolCache.Set(accountName, "")
×
473
        }
×
474

475
        isOperationSucceeded = true
1✔
476
        // reset secretNamespace field in VolumeContext
1✔
477
        setKeyValueInMap(parameters, secretNamespaceField, secretNamespace)
1✔
478
        return &csi.CreateVolumeResponse{
1✔
479
                Volume: &csi.Volume{
1✔
480
                        VolumeId:      volumeID,
1✔
481
                        CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
1✔
482
                        VolumeContext: parameters,
1✔
483
                        ContentSource: req.GetVolumeContentSource(),
1✔
484
                },
1✔
485
        }, nil
1✔
486
}
487

488
// DeleteVolume delete a volume
489
func (d *Driver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
5✔
490
        volumeID := req.GetVolumeId()
5✔
491
        if len(volumeID) == 0 {
6✔
492
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
493
        }
1✔
494

495
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
4✔
496
                return nil, status.Errorf(codes.Internal, "invalid delete volume req: %v", req)
×
497
        }
×
498

499
        if acquired := d.volumeLocks.TryAcquire(volumeID); !acquired {
4✔
500
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
501
        }
×
502
        defer d.volumeLocks.Release(volumeID)
4✔
503

4✔
504
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
4✔
505
        if err != nil {
5✔
506
                // According to CSI Driver Sanity Tester, should succeed when an invalid volume id is used
1✔
507
                klog.Errorf("GetContainerInfo(%s) in DeleteVolume failed with error: %v", volumeID, err)
1✔
508
                return &csi.DeleteVolumeResponse{}, nil
1✔
509
        }
1✔
510

511
        secrets := req.GetSecrets()
3✔
512
        if len(secrets) == 0 && d.useDataPlaneAPI(volumeID, accountName) {
4✔
513
                _, accountName, accountKey, _, _, err := d.GetAuthEnv(ctx, volumeID, "", nil, secrets)
1✔
514
                if err != nil {
2✔
515
                        return nil, status.Errorf(codes.Internal, "GetAuthEnv(%s) failed with %v", volumeID, err)
1✔
516
                }
1✔
517
                if accountName != "" && accountKey != "" {
×
518
                        secrets = createStorageAccountSecret(accountName, accountKey)
×
519
                }
×
520
        }
521

522
        mc := metrics.NewMetricContext(blobCSIDriverName, "controller_delete_volume", d.cloud.ResourceGroup, d.cloud.SubscriptionID, d.Name)
2✔
523
        isOperationSucceeded := false
2✔
524
        defer func() {
4✔
525
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
2✔
526
        }()
2✔
527

528
        if resourceGroupName == "" {
3✔
529
                resourceGroupName = d.cloud.ResourceGroup
1✔
530
        }
1✔
531
        klog.V(2).Infof("deleting container(%s) rg(%s) account(%s) volumeID(%s)", containerName, resourceGroupName, accountName, volumeID)
2✔
532
        if err := d.DeleteBlobContainer(ctx, subsID, resourceGroupName, accountName, containerName, secrets); err != nil {
4✔
533
                return nil, status.Errorf(codes.Internal, "failed to delete container(%s) under rg(%s) account(%s) volumeID(%s), error: %v", containerName, resourceGroupName, accountName, volumeID, err)
2✔
534
        }
2✔
535

536
        isOperationSucceeded = true
×
537
        klog.V(2).Infof("container(%s) under rg(%s) account(%s) volumeID(%s) is deleted successfully", containerName, resourceGroupName, accountName, volumeID)
×
538
        return &csi.DeleteVolumeResponse{}, nil
×
539
}
540

541
// ValidateVolumeCapabilities return the capabilities of the volume
542
func (d *Driver) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
8✔
543
        volumeID := req.GetVolumeId()
8✔
544
        if len(volumeID) == 0 {
9✔
545
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
546
        }
1✔
547
        if err := isValidVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
9✔
548
                return nil, status.Error(codes.InvalidArgument, err.Error())
2✔
549
        }
2✔
550

551
        resourceGroupName, accountName, containerName, _, subsID, err := GetContainerInfo(volumeID)
5✔
552
        if err != nil {
6✔
553
                klog.Errorf("GetContainerInfo(%s) in ValidateVolumeCapabilities failed with error: %v", volumeID, err)
1✔
554
                return nil, status.Error(codes.NotFound, err.Error())
1✔
555
        }
1✔
556

557
        var exist bool
4✔
558
        secrets := req.GetSecrets()
4✔
559
        if len(secrets) > 0 {
5✔
560
                container, err := getContainerReference(containerName, secrets, d.cloud.Environment)
1✔
561
                if err != nil {
2✔
562
                        return nil, status.Error(codes.Internal, err.Error())
1✔
563
                }
1✔
564
                exist, err = container.Exists()
×
565
                if err != nil {
×
566
                        return nil, status.Error(codes.Internal, err.Error())
×
567
                }
×
568
        } else {
3✔
569
                if resourceGroupName == "" {
3✔
570
                        resourceGroupName = d.cloud.ResourceGroup
×
571
                }
×
572
                blobContainer, retryErr := d.cloud.BlobClient.GetContainer(ctx, subsID, resourceGroupName, accountName, containerName)
3✔
573
                err = retryErr.Error()
3✔
574
                if err != nil {
4✔
575
                        return nil, status.Error(codes.Internal, err.Error())
1✔
576
                }
1✔
577
                if blobContainer.ContainerProperties == nil {
3✔
578
                        return nil, status.Errorf(codes.Internal, "ContainerProperties of volume(%s) is nil", volumeID)
1✔
579
                }
1✔
580
                exist = blobContainer.ContainerProperties.Deleted != nil && !*blobContainer.ContainerProperties.Deleted
1✔
581
        }
582
        if !exist {
2✔
583
                return nil, status.Errorf(codes.NotFound, "requested volume(%s) does not exist", volumeID)
1✔
584
        }
1✔
585
        klog.V(2).Infof("ValidateVolumeCapabilities on volume(%s) succeeded", volumeID)
×
586

×
587
        // blob driver supports all AccessModes, no need to check capabilities here
×
588
        return &csi.ValidateVolumeCapabilitiesResponse{
×
589
                Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
×
590
                        VolumeCapabilities: req.GetVolumeCapabilities(),
×
591
                },
×
592
                Message: "",
×
593
        }, nil
×
594
}
595

596
func (d *Driver) ControllerPublishVolume(_ context.Context, _ *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
1✔
597
        return nil, status.Error(codes.Unimplemented, "ControllerPublishVolume is not yet implemented")
1✔
598
}
1✔
599

600
func (d *Driver) ControllerUnpublishVolume(_ context.Context, _ *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
1✔
601
        return nil, status.Error(codes.Unimplemented, "ControllerUnpublishVolume is not yet implemented")
1✔
602
}
1✔
603

604
// ControllerGetVolume get volume
605
func (d *Driver) ControllerGetVolume(context.Context, *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
1✔
606
        return nil, status.Error(codes.Unimplemented, "ControllerGetVolume is not yet implemented")
1✔
607
}
1✔
608

609
// GetCapacity returns the capacity of the total available storage pool
610
func (d *Driver) GetCapacity(_ context.Context, _ *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
1✔
611
        return nil, status.Error(codes.Unimplemented, "GetCapacity is not yet implemented")
1✔
612
}
1✔
613

614
// ListVolumes return all available volumes
615
func (d *Driver) ListVolumes(_ context.Context, _ *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
1✔
616
        return nil, status.Error(codes.Unimplemented, "ListVolumes is not yet implemented")
1✔
617
}
1✔
618

619
// CreateSnapshot create snapshot
620
func (d *Driver) CreateSnapshot(_ context.Context, _ *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
1✔
621
        return nil, status.Error(codes.Unimplemented, "CreateSnapshot is not yet implemented")
1✔
622
}
1✔
623

624
// DeleteSnapshot delete snapshot
625
func (d *Driver) DeleteSnapshot(_ context.Context, _ *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
1✔
626
        return nil, status.Error(codes.Unimplemented, "DeleteSnapshot is not yet implemented")
1✔
627
}
1✔
628

629
// ListSnapshots list snapshots
630
func (d *Driver) ListSnapshots(_ context.Context, _ *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
1✔
631
        return nil, status.Error(codes.Unimplemented, "ListSnapshots is not yet implemented")
1✔
632
}
1✔
633

634
// ControllerGetCapabilities returns the capabilities of the Controller plugin
635
func (d *Driver) ControllerGetCapabilities(_ context.Context, _ *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
1✔
636
        return &csi.ControllerGetCapabilitiesResponse{
1✔
637
                Capabilities: d.Cap,
1✔
638
        }, nil
1✔
639
}
1✔
640

641
// ControllerExpandVolume controller expand volume
642
func (d *Driver) ControllerExpandVolume(_ context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
4✔
643
        if len(req.GetVolumeId()) == 0 {
5✔
644
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
645
        }
1✔
646

647
        if req.GetCapacityRange() == nil {
4✔
648
                return nil, status.Error(codes.InvalidArgument, "Capacity Range missing in request")
1✔
649
        }
1✔
650

651
        if err := d.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_EXPAND_VOLUME); err != nil {
2✔
652
                return nil, status.Errorf(codes.Internal, "invalid expand volume req: %v", req)
×
653
        }
×
654

655
        volSizeBytes := int64(req.GetCapacityRange().GetRequiredBytes())
2✔
656
        requestGiB := int64(util.RoundUpGiB(volSizeBytes))
2✔
657

2✔
658
        if volSizeBytes > containerMaxSize {
3✔
659
                return nil, status.Errorf(codes.OutOfRange, "required bytes (%d) exceeds the maximum supported bytes (%d)", volSizeBytes, containerMaxSize)
1✔
660
        }
1✔
661

662
        klog.V(2).Infof("ControllerExpandVolume(%s) successfully, currentQuota: %d Gi", req.VolumeId, requestGiB)
1✔
663

1✔
664
        return &csi.ControllerExpandVolumeResponse{CapacityBytes: req.GetCapacityRange().GetRequiredBytes()}, nil
1✔
665
}
666

667
// CreateBlobContainer creates a blob container
668
func (d *Driver) CreateBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
9✔
669
        if containerName == "" {
10✔
670
                return fmt.Errorf("containerName is empty")
1✔
671
        }
1✔
672
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
16✔
673
                var err error
8✔
674
                if len(secrets) > 0 {
9✔
675
                        container, getErr := getContainerReference(containerName, secrets, d.cloud.Environment)
1✔
676
                        if getErr != nil {
2✔
677
                                return true, getErr
1✔
678
                        }
1✔
679
                        _, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
×
680
                } else {
7✔
681
                        blobContainer := storage.BlobContainer{
7✔
682
                                ContainerProperties: &storage.ContainerProperties{
7✔
683
                                        PublicAccess: storage.PublicAccessNone,
7✔
684
                                },
7✔
685
                        }
7✔
686
                        err = d.cloud.BlobClient.CreateContainer(ctx, subsID, resourceGroupName, accountName, containerName, blobContainer).Error()
7✔
687
                }
7✔
688
                if err != nil {
11✔
689
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
4✔
690
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) {
7✔
691
                                klog.Warningf("CreateContainer(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, containerName, err)
3✔
692
                                return false, nil
3✔
693
                        }
3✔
694
                }
695
                return true, err
4✔
696
        })
697
}
698

699
// DeleteBlobContainer deletes a blob container
700
func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, containerName string, secrets map[string]string) error {
8✔
701
        if containerName == "" {
9✔
702
                return fmt.Errorf("containerName is empty")
1✔
703
        }
1✔
704
        return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
14✔
705
                var err error
7✔
706
                if len(secrets) > 0 {
10✔
707
                        container, getErr := getContainerReference(containerName, secrets, d.cloud.Environment)
3✔
708
                        if getErr != nil {
6✔
709
                                return true, getErr
3✔
710
                        }
3✔
711
                        _, err = container.DeleteIfExists(nil)
×
712
                } else {
4✔
713
                        err = d.cloud.BlobClient.DeleteContainer(ctx, subsID, resourceGroupName, accountName, containerName).Error()
4✔
714
                }
4✔
715
                if err != nil {
7✔
716
                        if strings.Contains(err.Error(), containerBeingDeletedDataplaneAPIError) ||
3✔
717
                                strings.Contains(err.Error(), containerBeingDeletedManagementAPIError) ||
3✔
718
                                strings.Contains(err.Error(), statusCodeNotFound) ||
3✔
719
                                strings.Contains(err.Error(), httpCodeNotFound) {
5✔
720
                                klog.Warningf("delete container(%s) on account(%s) failed with error(%v), return as success", containerName, accountName, err)
2✔
721
                                return true, nil
2✔
722
                        }
2✔
723
                        return false, fmt.Errorf("failed to delete container(%s) on account(%s), error: %w", containerName, accountName, err)
1✔
724
                }
725
                return true, err
1✔
726
        })
727
}
728

729
// CopyBlobContainer copies a blob container in the same storage account
730
func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
6✔
731
        var sourceVolumeID string
6✔
732
        if req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetVolume() != nil {
12✔
733
                sourceVolumeID = req.GetVolumeContentSource().GetVolume().GetVolumeId()
6✔
734

6✔
735
        }
6✔
736
        resourceGroupName, accountName, srcContainerName, _, _, err := GetContainerInfo(sourceVolumeID) //nolint:dogsled
6✔
737
        if err != nil {
8✔
738
                return status.Error(codes.NotFound, err.Error())
2✔
739
        }
2✔
740
        if srcContainerName == "" || dstContainerName == "" {
6✔
741
                return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName)
2✔
742
        }
2✔
743

744
        timeAfter := time.After(waitForCopyTimeout)
2✔
745
        timeTick := time.Tick(waitForCopyInterval)
2✔
746
        srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
2✔
747
        dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)
2✔
748

2✔
749
        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
2✔
750
        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
2✔
751
        if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted {
3✔
752
                return err
1✔
753
        }
1✔
754
        klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
1✔
755
        for {
2✔
756
                select {
1✔
757
                case <-timeTick:
1✔
758
                        jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv)
1✔
759
                        klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err)
1✔
760
                        switch jobState {
1✔
761
                        case util.AzcopyJobError, util.AzcopyJobCompleted:
1✔
762
                                return err
1✔
763
                        case util.AzcopyJobNotFound:
×
764
                                klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
×
765
                                cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false")
×
766
                                if len(authAzcopyEnv) > 0 {
×
767
                                        cmd.Env = append(os.Environ(), authAzcopyEnv...)
×
768
                                }
×
769
                                out, copyErr := cmd.CombinedOutput()
×
770
                                if copyErr != nil {
×
771
                                        klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out))
×
772
                                } else {
×
773
                                        klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
×
774
                                }
×
775
                                return copyErr
×
776
                        }
777
                case <-timeAfter:
×
778
                        return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName)
×
779
                }
780
        }
781
}
782

783
// copyVolume copies a volume form volume or snapshot, snapshot is not supported now
784
func (d *Driver) copyVolume(req *csi.CreateVolumeRequest, accountSASToken string, authAzcopyEnv []string, dstContainerName, storageEndpointSuffix string) error {
8✔
785
        vs := req.VolumeContentSource
8✔
786
        switch vs.Type.(type) {
8✔
787
        case *csi.VolumeContentSource_Snapshot:
2✔
788
                return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
2✔
789
        case *csi.VolumeContentSource_Volume:
6✔
790
                return d.copyBlobContainer(req, accountSASToken, authAzcopyEnv, dstContainerName, storageEndpointSuffix)
6✔
791
        default:
×
792
                return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
×
793
        }
794
}
795

796
// authorizeAzcopyWithIdentity returns auth env for azcopy using cluster identity
797
func (d *Driver) authorizeAzcopyWithIdentity() ([]string, error) {
7✔
798
        azureAuthConfig := d.cloud.Config.AzureAuthConfig
7✔
799
        var authAzcopyEnv []string
7✔
800
        if azureAuthConfig.UseManagedIdentityExtension {
11✔
801
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, MSI))
4✔
802
                if len(azureAuthConfig.UserAssignedIdentityID) > 0 {
5✔
803
                        klog.V(2).Infof("use user assigned managed identity to authorize azcopy")
1✔
804
                        authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyMSIClientID, azureAuthConfig.UserAssignedIdentityID))
1✔
805
                } else {
4✔
806
                        klog.V(2).Infof("use system-assigned managed identity to authorize azcopy")
3✔
807
                }
3✔
808
                return authAzcopyEnv, nil
4✔
809
        }
810
        if len(azureAuthConfig.AADClientSecret) > 0 {
5✔
811
                klog.V(2).Infof("use service principal to authorize azcopy")
2✔
812
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyAutoLoginType, SPN))
2✔
813
                if azureAuthConfig.AADClientID == "" || azureAuthConfig.TenantID == "" {
3✔
814
                        return []string{}, fmt.Errorf("AADClientID and TenantID must be set when use service principal")
1✔
815
                }
1✔
816
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAApplicationID, azureAuthConfig.AADClientID))
1✔
817
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopySPAClientSecret, azureAuthConfig.AADClientSecret))
1✔
818
                authAzcopyEnv = append(authAzcopyEnv, fmt.Sprintf("%s=%s", azcopyTenantID, azureAuthConfig.TenantID))
1✔
819
                klog.V(2).Infof(fmt.Sprintf("set AZCOPY_SPA_APPLICATION_ID=%s, AZCOPY_TENANT_ID=%s successfully", azureAuthConfig.AADClientID, azureAuthConfig.TenantID))
1✔
820

1✔
821
                return authAzcopyEnv, nil
1✔
822
        }
823
        return []string{}, fmt.Errorf("service principle or managed identity are both not set")
1✔
824
}
825

826
// getAzcopyAuth will only generate sas token for azcopy in following conditions:
827
// 1. secrets is not empty
828
// 2. driver is not using managed identity and service principal
829
// 3. azcopy returns AuthorizationPermissionMismatch error when using service principal or managed identity
830
func (d *Driver) getAzcopyAuth(ctx context.Context, accountName, accountKey, storageEndpointSuffix string, accountOptions *azure.AccountOptions, secrets map[string]string, secretName, secretNamespace string) (string, []string, error) {
6✔
831
        var authAzcopyEnv []string
6✔
832
        useSasToken := false
6✔
833
        if len(secrets) == 0 && len(secretName) == 0 {
8✔
834
                var err error
2✔
835
                authAzcopyEnv, err = d.authorizeAzcopyWithIdentity()
2✔
836
                if err != nil {
2✔
837
                        klog.Warningf("failed to authorize azcopy with identity, error: %v", err)
×
838
                } else {
2✔
839
                        if len(authAzcopyEnv) > 0 {
4✔
840
                                // search in cache first
2✔
841
                                cache, err := d.azcopySasTokenCache.Get(accountName, azcache.CacheReadTypeDefault)
2✔
842
                                if err != nil {
2✔
843
                                        return "", nil, fmt.Errorf("get(%s) from azcopySasTokenCache failed with error: %v", accountName, err)
×
844
                                }
×
845
                                if cache != nil {
2✔
846
                                        klog.V(2).Infof("use sas token for account(%s) since this account is found in azcopySasTokenCache", accountName)
×
847
                                        useSasToken = true
×
848
                                } else {
2✔
849
                                        out, testErr := d.azcopy.TestListJobs(accountName, storageEndpointSuffix, authAzcopyEnv)
2✔
850
                                        if testErr != nil {
2✔
851
                                                return "", nil, fmt.Errorf("azcopy list command failed with error(%v): %v", testErr, out)
×
852
                                        }
×
853
                                        if strings.Contains(out, authorizationPermissionMismatch) {
2✔
854
                                                klog.Warningf("azcopy list failed with AuthorizationPermissionMismatch error, should assign \"Storage Blob Data Contributor\" role to controller identity, fall back to use sas token, original output: %v", out)
×
855
                                                d.azcopySasTokenCache.Set(accountName, "")
×
856
                                                useSasToken = true
×
857
                                        }
×
858
                                }
859
                        }
860
                }
861
        }
862

863
        if len(secrets) > 0 || len(secretName) > 0 || len(authAzcopyEnv) == 0 || useSasToken {
10✔
864
                var err error
4✔
865
                if accountKey == "" {
8✔
866
                        if _, accountKey, err = d.GetStorageAccesskey(ctx, accountOptions, secrets, secretName, secretNamespace); err != nil {
5✔
867
                                return "", nil, err
1✔
868
                        }
1✔
869
                }
870
                klog.V(2).Infof("generate sas token for account(%s)", accountName)
3✔
871
                sasToken, err := generateSASToken(accountName, accountKey, storageEndpointSuffix, d.sasTokenExpirationMinutes)
3✔
872
                return sasToken, nil, err
3✔
873
        }
874
        return "", authAzcopyEnv, nil
2✔
875
}
876

877
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
878
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
29✔
879
        if len(volCaps) == 0 {
31✔
880
                return fmt.Errorf("volume capabilities missing in request")
2✔
881
        }
2✔
882
        for _, c := range volCaps {
54✔
883
                if c.GetBlock() != nil {
29✔
884
                        return fmt.Errorf("block volume capability not supported")
2✔
885
                }
2✔
886
        }
887
        return nil
25✔
888
}
889

890
func parseDays(dayStr string) (int32, error) {
3✔
891
        days, err := strconv.Atoi(dayStr)
3✔
892
        if err != nil {
4✔
893
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class", softDeleteBlobsField, dayStr))
1✔
894
        }
1✔
895
        if days <= 0 || days > 365 {
3✔
896
                return 0, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid %s:%s in storage class, should be in range [1, 365]", softDeleteBlobsField, dayStr))
1✔
897
        }
1✔
898

899
        return int32(days), nil
1✔
900
}
901

902
// generateSASToken generate a sas token for storage account
903
func generateSASToken(accountName, accountKey, storageEndpointSuffix string, expiryTime int) (string, error) {
5✔
904
        credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
5✔
905
        if err != nil {
8✔
906
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, err: %s", accountName, err.Error()))
3✔
907
        }
3✔
908
        serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, nil)
2✔
909
        if err != nil {
2✔
910
                return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, err: %s", accountName, err.Error()))
×
911
        }
×
912
        sasURL, err := serviceClient.GetSASURL(
2✔
913
                sas.AccountResourceTypes{Object: true, Service: false, Container: true},
2✔
914
                sas.AccountPermissions{Read: true, List: true, Write: true},
2✔
915
                time.Now().Add(time.Duration(expiryTime)*time.Minute),
2✔
916
                &service.GetSASURLOptions{StartTime: to.Ptr(time.Now())},
2✔
917
        )
2✔
918
        if err != nil {
2✔
919
                return "", err
×
920
        }
×
921
        u, err := url.Parse(sasURL)
2✔
922
        if err != nil {
2✔
923
                return "", err
×
924
        }
×
925
        return "?" + u.RawQuery, nil
2✔
926
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc