• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 13386305256

18 Feb 2025 08:27AM UTC coverage: 77.861%. Remained the same
13386305256

Pull #1839

github

andyzhangx
feat: optimize azcopy perf in volume cloning scenario
Pull Request #1839: feat: optimize azcopy perf in volume cloning scenario

2286 of 2936 relevant lines covered (77.86%)

7.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
/pkg/blob/azure.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "fmt"
21
        "os"
22
        "strings"
23

24
        "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
25
        network "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v6"
26
        "github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/azsecrets"
27
        "github.com/Azure/azure-sdk-for-go/storage"
28
        azure2 "github.com/Azure/go-autorest/autorest/azure"
29
        "golang.org/x/net/context"
30
        "k8s.io/client-go/kubernetes"
31
        "k8s.io/klog/v2"
32
        "k8s.io/utils/ptr"
33
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient"
34
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient/configloader"
35
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
36
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
37
        azureconfig "sigs.k8s.io/cloud-provider-azure/pkg/provider/config"
38
)
39

40
var (
41
        DefaultAzureCredentialFileEnv = "AZURE_CREDENTIAL_FILE"
42
        DefaultCredFilePath           = "/etc/kubernetes/azure.json"
43
        storageService                = "Microsoft.Storage"
44
)
45

46
// IsAzureStackCloud decides whether the driver is running on Azure Stack Cloud.
47
func IsAzureStackCloud(cloud *azure.Cloud) bool {
10✔
48
        return !cloud.DisableAzureStackCloud && strings.EqualFold(cloud.Cloud, "AZURESTACKCLOUD")
10✔
49
}
10✔
50

51
// getCloudProvider get Azure Cloud Provider
52
func GetCloudProvider(ctx context.Context, kubeClient kubernetes.Interface, nodeID, secretName, secretNamespace, userAgent string, allowEmptyCloudConfig bool) (*azure.Cloud, error) {
5✔
53
        var (
5✔
54
                config     *azureconfig.Config
5✔
55
                fromSecret bool
5✔
56
                err        error
5✔
57
        )
5✔
58

5✔
59
        az := &azure.Cloud{}
5✔
60
        az.Environment.StorageEndpointSuffix = storage.DefaultBaseURL
5✔
61

5✔
62
        if kubeClient != nil {
7✔
63
                az.KubeClient = kubeClient
2✔
64
                klog.V(2).Infof("reading cloud config from secret %s/%s", secretNamespace, secretName)
2✔
65
                config, err = configloader.Load[azureconfig.Config](ctx, &configloader.K8sSecretLoaderConfig{
2✔
66
                        K8sSecretConfig: configloader.K8sSecretConfig{
2✔
67
                                SecretName:      secretName,
2✔
68
                                SecretNamespace: secretNamespace,
2✔
69
                                CloudConfigKey:  "cloud-config",
2✔
70
                        },
2✔
71
                        KubeClient: kubeClient,
2✔
72
                }, nil)
2✔
73
                if err == nil && config != nil {
2✔
74
                        fromSecret = true
×
75
                }
×
76
                if err != nil {
4✔
77
                        klog.V(2).Infof("InitializeCloudFromSecret: failed to get cloud config from secret %s/%s: %v", secretNamespace, secretName, err)
2✔
78
                }
2✔
79
        }
80

81
        if config == nil {
10✔
82
                klog.V(2).Infof("could not read cloud config from secret %s/%s", secretNamespace, secretName)
5✔
83
                credFile, ok := os.LookupEnv(DefaultAzureCredentialFileEnv)
5✔
84
                if ok && strings.TrimSpace(credFile) != "" {
7✔
85
                        klog.V(2).Infof("%s env var set as %v", DefaultAzureCredentialFileEnv, credFile)
2✔
86
                } else {
5✔
87
                        credFile = DefaultCredFilePath
3✔
88
                        klog.V(2).Infof("use default %s env var: %v", DefaultAzureCredentialFileEnv, credFile)
3✔
89
                }
3✔
90

91
                config, err = configloader.Load[azureconfig.Config](ctx, nil, &configloader.FileLoaderConfig{
5✔
92
                        FilePath: credFile,
5✔
93
                })
5✔
94
                if err != nil {
8✔
95
                        klog.Warningf("load azure config from file(%s) failed with %v", credFile, err)
3✔
96
                }
3✔
97
        }
98

99
        if config == nil {
8✔
100
                if allowEmptyCloudConfig {
5✔
101
                        klog.V(2).Infof("no cloud config provided, error: %v, driver will run without cloud config", err)
2✔
102
                } else {
3✔
103
                        return az, fmt.Errorf("no cloud config provided, error: %w", err)
1✔
104
                }
1✔
105
        } else {
2✔
106
                config.UserAgent = userAgent
2✔
107
                config.CloudProviderBackoff = true
2✔
108
                // these environment variables are injected by workload identity webhook
2✔
109
                if tenantID := os.Getenv("AZURE_TENANT_ID"); tenantID != "" {
3✔
110
                        config.TenantID = tenantID
1✔
111
                }
1✔
112
                if clientID := os.Getenv("AZURE_CLIENT_ID"); clientID != "" {
3✔
113
                        config.AADClientID = clientID
1✔
114
                }
1✔
115
                if federatedTokenFile := os.Getenv("AZURE_FEDERATED_TOKEN_FILE"); federatedTokenFile != "" {
3✔
116
                        config.AADFederatedTokenFile = federatedTokenFile
1✔
117
                        config.UseFederatedWorkloadIdentityExtension = true
1✔
118
                }
1✔
119
                if err = az.InitializeCloudFromConfig(ctx, config, fromSecret, false); err != nil {
2✔
120
                        klog.Warningf("InitializeCloudFromConfig failed with error: %v", err)
×
121
                }
×
122
        }
123

124
        // reassign kubeClient
125
        if kubeClient != nil && az.KubeClient == nil {
4✔
126
                az.KubeClient = kubeClient
×
127
        }
×
128

129
        isController := (nodeID == "")
4✔
130
        if isController {
8✔
131
                if err == nil {
6✔
132
                        // Disable UseInstanceMetadata for controller to mitigate a timeout issue using IMDS
2✔
133
                        // https://github.com/kubernetes-sigs/azuredisk-csi-driver/issues/168
2✔
134
                        klog.V(2).Infof("disable UseInstanceMetadata for controller server")
2✔
135
                        az.Config.UseInstanceMetadata = false
2✔
136
                }
2✔
137
                klog.V(2).Infof("starting controller server...")
4✔
138
        } else {
×
139
                klog.V(2).Infof("starting node server on node(%s)", nodeID)
×
140
        }
×
141

142
        if az.Environment.StorageEndpointSuffix == "" {
4✔
143
                az.Environment.StorageEndpointSuffix = storage.DefaultBaseURL
×
144
        }
×
145
        return az, nil
4✔
146
}
147

148
// getKeyVaultSecretContent get content of the keyvault secret
149
func (d *Driver) getKeyVaultSecretContent(ctx context.Context, vaultURL string, secretName string, secretVersion string) (content string, err error) {
3✔
150
        var authProvider *azclient.AuthProvider
3✔
151
        authProvider, err = azclient.NewAuthProvider(&d.cloud.ARMClientConfig, &d.cloud.AzureAuthConfig)
3✔
152
        if err != nil {
4✔
153
                return "", err
1✔
154
        }
1✔
155
        kvClient, err := azsecrets.NewClient(vaultURL, authProvider.GetAzIdentity(), nil)
2✔
156
        if err != nil {
2✔
157
                return "", fmt.Errorf("failed to get keyvaultClient: %w", err)
×
158
        }
×
159

160
        klog.V(2).Infof("get secret from vaultURL(%v), sercretName(%v), secretVersion(%v)", vaultURL, secretName, secretVersion)
2✔
161
        secret, err := kvClient.GetSecret(ctx, secretName, secretVersion, nil)
2✔
162
        if err != nil {
4✔
163
                return "", fmt.Errorf("get secret from vaultURL(%v), sercretName(%v), secretVersion(%v) failed with error: %w", vaultURL, secretName, secretVersion, err)
2✔
164
        }
2✔
165
        return *secret.Value, nil
×
166
}
167

168
func (d *Driver) updateSubnetServiceEndpoints(ctx context.Context, vnetResourceGroup, vnetName, subnetName string) ([]string, error) {
5✔
169
        var vnetResourceIDs []string
5✔
170
        if d.networkClientFactory == nil {
6✔
171
                return vnetResourceIDs, fmt.Errorf("networkClientFactory is nil")
1✔
172
        }
1✔
173

174
        if vnetResourceGroup == "" {
8✔
175
                vnetResourceGroup = d.cloud.ResourceGroup
4✔
176
                if len(d.cloud.VnetResourceGroup) > 0 {
4✔
177
                        vnetResourceGroup = d.cloud.VnetResourceGroup
×
178
                }
×
179
        }
180

181
        location := d.cloud.Location
4✔
182
        if vnetName == "" {
8✔
183
                vnetName = d.cloud.VnetName
4✔
184
        }
4✔
185

186
        klog.V(2).Infof("updateSubnetServiceEndpoints on vnetName: %s, subnetName: %s, location: %s", vnetName, subnetName, location)
4✔
187
        if vnetName == "" || location == "" {
4✔
188
                return vnetResourceIDs, fmt.Errorf("vnetName or location is empty")
×
189
        }
×
190

191
        lockKey := vnetResourceGroup + vnetName + subnetName
4✔
192
        cache, err := d.subnetCache.Get(ctx, lockKey, azcache.CacheReadTypeDefault)
4✔
193
        if err != nil {
4✔
194
                return nil, err
×
195
        }
×
196
        if cache != nil {
6✔
197
                vnetResourceIDs = cache.([]string)
2✔
198
                klog.V(2).Infof("subnet %s under vnet %s in rg %s is already updated, vnetResourceIDs: %v", subnetName, vnetName, vnetResourceGroup, vnetResourceIDs)
2✔
199
                return vnetResourceIDs, nil
2✔
200
        }
2✔
201

202
        d.subnetLockMap.LockEntry(lockKey)
2✔
203
        defer d.subnetLockMap.UnlockEntry(lockKey)
2✔
204

2✔
205
        var subnets []*network.Subnet
2✔
206
        if subnetName != "" {
4✔
207
                // list multiple subnets separated by comma
2✔
208
                subnetNames := strings.Split(subnetName, ",")
2✔
209
                for _, sn := range subnetNames {
4✔
210
                        sn = strings.TrimSpace(sn)
2✔
211
                        subnet, rerr := d.networkClientFactory.GetSubnetClient().Get(ctx, vnetResourceGroup, vnetName, sn, nil)
2✔
212
                        if rerr != nil {
2✔
213
                                return vnetResourceIDs, fmt.Errorf("failed to get the subnet %s under rg %s vnet %s: %v", subnetName, vnetResourceGroup, vnetName, rerr.Error())
×
214
                        }
×
215
                        subnets = append(subnets, subnet)
2✔
216
                }
217
        } else {
×
218
                var rerr error
×
219
                subnets, rerr = d.networkClientFactory.GetSubnetClient().List(ctx, vnetResourceGroup, vnetName)
×
220
                if rerr != nil {
×
221
                        return vnetResourceIDs, fmt.Errorf("failed to list the subnets under rg %s vnet %s: %v", vnetResourceGroup, vnetName, rerr.Error())
×
222
                }
×
223
        }
224

225
        for _, subnet := range subnets {
4✔
226
                if subnet.Name == nil {
3✔
227
                        return vnetResourceIDs, fmt.Errorf("subnet name is nil")
1✔
228
                }
1✔
229
                sn := *subnet.Name
1✔
230
                vnetResourceID := d.getSubnetResourceID(vnetResourceGroup, vnetName, sn)
1✔
231
                klog.V(2).Infof("set vnetResourceID %s", vnetResourceID)
1✔
232
                vnetResourceIDs = append(vnetResourceIDs, vnetResourceID)
1✔
233

1✔
234
                endpointLocaions := []*string{to.Ptr(location)}
1✔
235
                storageServiceEndpoint := &network.ServiceEndpointPropertiesFormat{
1✔
236
                        Service:   &storageService,
1✔
237
                        Locations: endpointLocaions,
1✔
238
                }
1✔
239
                storageServiceExists := false
1✔
240
                if subnet.Properties == nil {
1✔
241
                        subnet.Properties = &network.SubnetPropertiesFormat{}
×
242
                }
×
243
                if subnet.Properties.ServiceEndpoints == nil {
2✔
244
                        subnet.Properties.ServiceEndpoints = []*network.ServiceEndpointPropertiesFormat{}
1✔
245
                }
1✔
246
                serviceEndpoints := subnet.Properties.ServiceEndpoints
1✔
247
                for _, v := range serviceEndpoints {
1✔
248
                        if strings.HasPrefix(ptr.Deref(v.Service, ""), storageService) {
×
249
                                storageServiceExists = true
×
250
                                klog.V(4).Infof("serviceEndpoint(%s) is already in subnet(%s)", storageService, sn)
×
251
                                break
×
252
                        }
253
                }
254

255
                if !storageServiceExists {
2✔
256
                        serviceEndpoints = append(serviceEndpoints, storageServiceEndpoint)
1✔
257
                        subnet.Properties.ServiceEndpoints = serviceEndpoints
1✔
258

1✔
259
                        klog.V(2).Infof("begin to update the subnet %s under vnet %s in rg %s", sn, vnetName, vnetResourceGroup)
1✔
260
                        if _, err := d.networkClientFactory.GetSubnetClient().CreateOrUpdate(ctx, vnetResourceGroup, vnetName, sn, *subnet); err != nil {
1✔
261
                                return vnetResourceIDs, fmt.Errorf("failed to update the subnet %s under vnet %s: %v", sn, vnetName, err)
×
262
                        }
×
263
                }
264
        }
265
        // cache the subnet update
266
        d.subnetCache.Set(lockKey, vnetResourceIDs)
1✔
267
        return vnetResourceIDs, nil
1✔
268
}
269

270
func (d *Driver) getStorageEndPointSuffix() string {
15✔
271
        if d.cloud == nil || d.cloud.Environment.StorageEndpointSuffix == "" {
27✔
272
                return defaultStorageEndPointSuffix
12✔
273
        }
12✔
274
        return d.cloud.Environment.StorageEndpointSuffix
3✔
275
}
276

277
func (d *Driver) getCloudEnvironment() azure2.Environment {
7✔
278
        if d.cloud == nil {
8✔
279
                return azure2.PublicCloud
1✔
280
        }
1✔
281
        return d.cloud.Environment
6✔
282
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc