• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 14463249672

15 Apr 2025 06:57AM UTC coverage: 75.675% (-0.06%) from 75.735%
14463249672

Pull #3024

github

andyzhangx
fix: remove unnecessary get vmss call during disk attach
Pull Request #3024: [release-1.32] fix: remove unnecessary get vmss call during disk attach

3758 of 4966 relevant lines covered (75.67%)

6.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.68
/pkg/azuredisk/azuredisk.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package azuredisk
18

19
import (
20
        "context"
21
        "encoding/json"
22
        "errors"
23
        "flag"
24
        "fmt"
25
        "reflect"
26
        "strconv"
27
        "strings"
28
        "time"
29

30
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v6"
31
        "github.com/container-storage-interface/spec/lib/go/csi"
32
        "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
33

34
        "google.golang.org/grpc"
35
        "google.golang.org/grpc/codes"
36
        "google.golang.org/grpc/status"
37

38
        grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
39
        corev1 "k8s.io/api/core/v1"
40
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
41
        "k8s.io/apimachinery/pkg/types"
42
        k8stypes "k8s.io/apimachinery/pkg/types"
43
        "k8s.io/apimachinery/pkg/util/wait"
44
        "k8s.io/client-go/kubernetes"
45
        clientset "k8s.io/client-go/kubernetes"
46
        "k8s.io/klog/v2"
47
        "k8s.io/kubernetes/pkg/volume/util/hostutil"
48
        "k8s.io/mount-utils"
49
        "k8s.io/utils/ptr"
50

51
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
52
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
53
        csicommon "sigs.k8s.io/azuredisk-csi-driver/pkg/csi-common"
54
        "sigs.k8s.io/azuredisk-csi-driver/pkg/mounter"
55
        "sigs.k8s.io/azuredisk-csi-driver/pkg/optimization"
56
        volumehelper "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
57
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient"
58
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
59
        azurecloudconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
60
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
61
)
62

63
var (
64
        useDriverV2 = flag.Bool("temp-use-driver-v2", false, "A temporary flag to enable early test and development of Azure Disk CSI Driver V2. This will be removed in the future.")
65

66
        // taintRemovalBackoff is the exponential backoff configuration for node taint removal
67
        taintRemovalBackoff = wait.Backoff{
68
                Duration: 500 * time.Millisecond,
69
                Factor:   2,
70
                Steps:    10, // Max delay = 0.5 * 2^9 = ~4 minutes
71
        }
72
)
73

74
// CSIDriver defines the interface for a CSI driver.
75
type CSIDriver interface {
76
        csi.ControllerServer
77
        csi.NodeServer
78
        csi.IdentityServer
79

80
        Run(ctx context.Context) error
81
}
82

83
type hostUtil interface {
84
        PathIsDevice(string) (bool, error)
85
}
86

87
// DriverCore contains fields common to both the V1 and V2 driver, and implements all interfaces of CSI drivers
88
type DriverCore struct {
89
        csicommon.CSIDriver
90
        // Embed UnimplementedXXXServer to ensure the driver returns Unimplemented for any
91
        // new RPC methods that might be introduced in future versions of the spec.
92
        csi.UnimplementedControllerServer
93
        csi.UnimplementedIdentityServer
94
        csi.UnimplementedNodeServer
95

96
        perfOptimizationEnabled      bool
97
        cloudConfigSecretName        string
98
        cloudConfigSecretNamespace   string
99
        customUserAgent              string
100
        userAgentSuffix              string
101
        cloud                        *azure.Cloud
102
        clientFactory                azclient.ClientFactory
103
        diskController               *ManagedDiskController
104
        mounter                      *mount.SafeFormatAndMount
105
        deviceHelper                 optimization.Interface
106
        nodeInfo                     *optimization.NodeInfo
107
        ioHandler                    azureutils.IOHandler
108
        hostUtil                     hostUtil
109
        useCSIProxyGAInterface       bool
110
        enableDiskOnlineResize       bool
111
        allowEmptyCloudConfig        bool
112
        enableListVolumes            bool
113
        enableListSnapshots          bool
114
        supportZone                  bool
115
        getNodeInfoFromLabels        bool
116
        enableDiskCapacityCheck      bool
117
        enableTrafficManager         bool
118
        trafficManagerPort           int64
119
        vmssCacheTTLInSeconds        int64
120
        volStatsCacheExpireInMinutes int64
121
        attachDetachInitialDelayInMs int64
122
        getDiskTimeoutInSeconds      int64
123
        vmType                       string
124
        enableWindowsHostProcess     bool
125
        listDisksUsingWinCIM         bool
126
        getNodeIDFromIMDS            bool
127
        enableOtelTracing            bool
128
        shouldWaitForSnapshotReady   bool
129
        checkDiskLUNCollision        bool
130
        forceDetachBackoff           bool
131
        waitForDetach                bool
132
        endpoint                     string
133
        disableAVSetNodes            bool
134
        removeNotReadyTaint          bool
135
        kubeClient                   kubernetes.Interface
136
        // a timed cache storing volume stats <volumeID, volumeStats>
137
        volStatsCache           azcache.Resource
138
        maxConcurrentFormat     int64
139
        concurrentFormatTimeout int64
140
}
141

142
// Driver is the v1 implementation of the Azure Disk CSI Driver.
143
type Driver struct {
144
        DriverCore
145
        volumeLocks *volumehelper.VolumeLocks
146
        // a timed cache for throttling
147
        throttlingCache azcache.Resource
148
        // a timed cache for disk lun collision check throttling
149
        checkDiskLunThrottlingCache azcache.Resource
150
}
151

152
// newDriverV1 Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
153
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
154
func newDriverV1(options *DriverOptions) *Driver {
3✔
155
        driver := Driver{}
3✔
156
        driver.Name = options.DriverName
3✔
157
        driver.Version = driverVersion
3✔
158
        driver.NodeID = options.NodeID
3✔
159
        driver.VolumeAttachLimit = options.VolumeAttachLimit
3✔
160
        driver.ReservedDataDiskSlotNum = options.ReservedDataDiskSlotNum
3✔
161
        driver.perfOptimizationEnabled = options.EnablePerfOptimization
3✔
162
        driver.cloudConfigSecretName = options.CloudConfigSecretName
3✔
163
        driver.cloudConfigSecretNamespace = options.CloudConfigSecretNamespace
3✔
164
        driver.customUserAgent = options.CustomUserAgent
3✔
165
        driver.userAgentSuffix = options.UserAgentSuffix
3✔
166
        driver.useCSIProxyGAInterface = options.UseCSIProxyGAInterface
3✔
167
        driver.enableDiskOnlineResize = options.EnableDiskOnlineResize
3✔
168
        driver.allowEmptyCloudConfig = options.AllowEmptyCloudConfig
3✔
169
        driver.enableListVolumes = options.EnableListVolumes
3✔
170
        driver.enableListSnapshots = options.EnableListVolumes
3✔
171
        driver.supportZone = options.SupportZone
3✔
172
        driver.getNodeInfoFromLabels = options.GetNodeInfoFromLabels
3✔
173
        driver.enableDiskCapacityCheck = options.EnableDiskCapacityCheck
3✔
174
        driver.attachDetachInitialDelayInMs = options.AttachDetachInitialDelayInMs
3✔
175
        driver.enableTrafficManager = options.EnableTrafficManager
3✔
176
        driver.trafficManagerPort = options.TrafficManagerPort
3✔
177
        driver.vmssCacheTTLInSeconds = options.VMSSCacheTTLInSeconds
3✔
178
        driver.volStatsCacheExpireInMinutes = options.VolStatsCacheExpireInMinutes
3✔
179
        driver.getDiskTimeoutInSeconds = options.GetDiskTimeoutInSeconds
3✔
180
        driver.vmType = options.VMType
3✔
181
        driver.enableWindowsHostProcess = options.EnableWindowsHostProcess
3✔
182
        driver.listDisksUsingWinCIM = options.ListDisksUsingWinCIM
3✔
183
        driver.getNodeIDFromIMDS = options.GetNodeIDFromIMDS
3✔
184
        driver.enableOtelTracing = options.EnableOtelTracing
3✔
185
        driver.shouldWaitForSnapshotReady = options.WaitForSnapshotReady
3✔
186
        driver.checkDiskLUNCollision = options.CheckDiskLUNCollision
3✔
187
        driver.forceDetachBackoff = options.ForceDetachBackoff
3✔
188
        driver.waitForDetach = options.WaitForDetach
3✔
189
        driver.endpoint = options.Endpoint
3✔
190
        driver.disableAVSetNodes = options.DisableAVSetNodes
3✔
191
        driver.removeNotReadyTaint = options.RemoveNotReadyTaint
3✔
192
        driver.maxConcurrentFormat = options.MaxConcurrentFormat
3✔
193
        driver.concurrentFormatTimeout = options.ConcurrentFormatTimeout
3✔
194
        driver.volumeLocks = volumehelper.NewVolumeLocks()
3✔
195
        driver.ioHandler = azureutils.NewOSIOHandler()
3✔
196
        driver.hostUtil = hostutil.NewHostUtil()
3✔
197
        if driver.NodeID == "" {
6✔
198
                // nodeid is not needed in controller component
3✔
199
                klog.Warning("nodeid is empty")
3✔
200
        }
3✔
201
        topologyKey = fmt.Sprintf("topology.%s/zone", driver.Name)
3✔
202

3✔
203
        getter := func(_ context.Context, _ string) (interface{}, error) { return nil, nil }
3✔
204
        var err error
3✔
205
        if driver.throttlingCache, err = azcache.NewTimedCache(5*time.Minute, getter, false); err != nil {
3✔
206
                klog.Fatalf("%v", err)
×
207
        }
×
208
        if driver.checkDiskLunThrottlingCache, err = azcache.NewTimedCache(30*time.Minute, getter, false); err != nil {
3✔
209
                klog.Fatalf("%v", err)
×
210
        }
×
211

212
        if options.VolStatsCacheExpireInMinutes <= 0 {
6✔
213
                options.VolStatsCacheExpireInMinutes = 10 // default expire in 10 minutes
3✔
214
        }
3✔
215
        if driver.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
3✔
216
                klog.Fatalf("%v", err)
×
217
        }
×
218

219
        userAgent := GetUserAgent(driver.Name, driver.customUserAgent, driver.userAgentSuffix)
3✔
220
        klog.V(2).Infof("driver userAgent: %s", userAgent)
3✔
221

3✔
222
        kubeClient, err := azureutils.GetKubeClient(options.Kubeconfig)
3✔
223
        if err != nil {
6✔
224
                klog.Warningf("get kubeconfig(%s) failed with error: %v", options.Kubeconfig, err)
3✔
225
        }
3✔
226
        driver.kubeClient = kubeClient
3✔
227

3✔
228
        cloud, err := azureutils.GetCloudProviderFromClient(context.Background(), kubeClient, driver.cloudConfigSecretName, driver.cloudConfigSecretNamespace,
3✔
229
                userAgent, driver.allowEmptyCloudConfig, driver.enableTrafficManager, driver.trafficManagerPort)
3✔
230
        if err != nil {
3✔
231
                klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
×
232
        }
×
233
        driver.cloud = cloud
3✔
234

3✔
235
        if driver.cloud != nil {
6✔
236
                driver.clientFactory = driver.cloud.ComputeClientFactory
3✔
237
                if driver.vmType != "" {
5✔
238
                        klog.V(2).Infof("override VMType(%s) in cloud config as %s", driver.cloud.VMType, driver.vmType)
2✔
239
                        driver.cloud.VMType = driver.vmType
2✔
240
                }
2✔
241

242
                if driver.NodeID == "" {
6✔
243
                        // Disable UseInstanceMetadata for controller to mitigate a timeout issue using IMDS
3✔
244
                        // https://github.com/kubernetes-sigs/azuredisk-csi-driver/issues/168
3✔
245
                        klog.V(2).Infof("disable UseInstanceMetadata for controller")
3✔
246
                        driver.cloud.Config.UseInstanceMetadata = false
3✔
247

3✔
248
                        if driver.cloud.VMType == azurecloudconsts.VMTypeStandard && driver.cloud.DisableAvailabilitySetNodes {
3✔
249
                                klog.V(2).Infof("set DisableAvailabilitySetNodes as false since VMType is %s", driver.cloud.VMType)
×
250
                                driver.cloud.DisableAvailabilitySetNodes = false
×
251
                        }
×
252

253
                        if driver.cloud.VMType == azurecloudconsts.VMTypeVMSS && !driver.cloud.DisableAvailabilitySetNodes && driver.disableAVSetNodes {
3✔
254
                                klog.V(2).Infof("DisableAvailabilitySetNodes for controller since current VMType is vmss")
×
255
                                driver.cloud.DisableAvailabilitySetNodes = true
×
256
                        }
×
257
                        klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VMType: %s, PrimaryScaleSetName: %s, PrimaryAvailabilitySetName: %s, DisableAvailabilitySetNodes: %v", driver.cloud.Cloud, driver.cloud.Location, driver.cloud.ResourceGroup, driver.cloud.VMType, driver.cloud.PrimaryScaleSetName, driver.cloud.PrimaryAvailabilitySetName, driver.cloud.DisableAvailabilitySetNodes)
3✔
258
                }
259

260
                if driver.vmssCacheTTLInSeconds > 0 {
5✔
261
                        klog.V(2).Infof("reset vmssCacheTTLInSeconds as %d", driver.vmssCacheTTLInSeconds)
2✔
262
                        driver.cloud.VMCacheTTLInSeconds = int(driver.vmssCacheTTLInSeconds)
2✔
263
                        driver.cloud.VmssCacheTTLInSeconds = int(driver.vmssCacheTTLInSeconds)
2✔
264
                }
2✔
265

266
                driver.diskController = NewManagedDiskController(driver.cloud)
3✔
267
                driver.diskController.AttachDetachInitialDelayInMs = int(driver.attachDetachInitialDelayInMs)
3✔
268
                driver.diskController.ForceDetachBackoff = driver.forceDetachBackoff
3✔
269
                driver.diskController.WaitForDetach = driver.waitForDetach
3✔
270
        }
271

272
        driver.deviceHelper = optimization.NewSafeDeviceHelper()
3✔
273

3✔
274
        if driver.getPerfOptimizationEnabled() {
5✔
275
                driver.nodeInfo, err = optimization.NewNodeInfo(context.TODO(), driver.getCloud(), driver.NodeID)
2✔
276
                if err != nil {
4✔
277
                        klog.Warningf("Failed to get node info. Error: %v", err)
2✔
278
                }
2✔
279
        }
280

281
        driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.listDisksUsingWinCIM, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
3✔
282
        if err != nil {
3✔
283
                klog.Fatalf("Failed to get safe mounter. Error: %v", err)
×
284
        }
×
285

286
        controllerCap := []csi.ControllerServiceCapability_RPC_Type{
3✔
287
                csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
3✔
288
                csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
3✔
289
                csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
3✔
290
                csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
3✔
291
                csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
3✔
292
                csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
3✔
293
                csi.ControllerServiceCapability_RPC_MODIFY_VOLUME,
3✔
294
        }
3✔
295
        if driver.enableListVolumes {
5✔
296
                controllerCap = append(controllerCap, csi.ControllerServiceCapability_RPC_LIST_VOLUMES, csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES)
2✔
297
        }
2✔
298
        if driver.enableListSnapshots {
5✔
299
                controllerCap = append(controllerCap, csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS)
2✔
300
        }
2✔
301

302
        driver.AddControllerServiceCapabilities(controllerCap)
3✔
303
        driver.AddVolumeCapabilityAccessModes(
3✔
304
                []csi.VolumeCapability_AccessMode_Mode{
3✔
305
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
3✔
306
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
3✔
307
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
3✔
308
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
3✔
309
                })
3✔
310
        driver.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
3✔
311
                csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
3✔
312
                csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
3✔
313
                csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
3✔
314
                csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
3✔
315
        })
3✔
316

3✔
317
        if kubeClient != nil && driver.removeNotReadyTaint && driver.NodeID != "" {
3✔
318
                // Remove taint from node to indicate driver startup success
×
319
                // This is done at the last possible moment to prevent race conditions or false positive removals
×
320
                time.AfterFunc(time.Duration(options.TaintRemovalInitialDelayInSeconds)*time.Second, func() {
×
321
                        removeTaintInBackground(kubeClient, driver.NodeID, driver.Name, taintRemovalBackoff, removeNotReadyTaint)
×
322
                })
×
323
        }
324
        return &driver
3✔
325
}
326

327
// Run driver initialization
328
func (d *Driver) Run(ctx context.Context) error {
5✔
329
        versionMeta, err := GetVersionYAML(d.Name)
5✔
330
        if err != nil {
5✔
331
                klog.Fatalf("%v", err)
×
332
        }
×
333
        klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)
5✔
334

5✔
335
        opts := []grpc.ServerOption{
5✔
336
                grpc.ChainUnaryInterceptor(
5✔
337
                        grpcprom.NewServerMetrics().UnaryServerInterceptor(),
5✔
338
                        csicommon.LogGRPC,
5✔
339
                ),
5✔
340
        }
5✔
341
        if d.enableOtelTracing {
5✔
342
                exporter, err := InitOtelTracing()
×
343
                if err != nil {
×
344
                        klog.Fatalf("Failed to initialize otel tracing: %v", err)
×
345
                }
×
346
                // Exporter will flush traces on shutdown
347
                defer func() {
×
348
                        if err := exporter.Shutdown(context.Background()); err != nil {
×
349
                                klog.Errorf("Could not shutdown otel exporter: %v", err)
×
350
                        }
×
351
                }()
352
                opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
×
353
        }
354

355
        s := grpc.NewServer(opts...)
5✔
356
        csi.RegisterIdentityServer(s, d)
5✔
357
        csi.RegisterControllerServer(s, d)
5✔
358
        csi.RegisterNodeServer(s, d)
5✔
359

5✔
360
        go func() {
10✔
361
                //graceful shutdown
5✔
362
                <-ctx.Done()
5✔
363
                s.GracefulStop()
5✔
364
        }()
5✔
365
        // Driver d act as IdentityServer, ControllerServer and NodeServer
366
        listener, err := csicommon.Listen(ctx, d.endpoint)
5✔
367
        if err != nil {
5✔
368
                klog.Fatalf("failed to listen to endpoint, error: %v", err)
×
369
        }
×
370
        err = s.Serve(listener)
5✔
371
        if errors.Is(err, grpc.ErrServerStopped) {
5✔
372
                klog.Infof("gRPC server stopped serving")
×
373
                return nil
×
374
        }
×
375
        return err
5✔
376
}
377

378
func (d *Driver) isGetDiskThrottled(ctx context.Context) bool {
23✔
379
        cache, err := d.throttlingCache.Get(ctx, consts.GetDiskThrottlingKey, azcache.CacheReadTypeDefault)
23✔
380
        if err != nil {
23✔
381
                klog.Warningf("throttlingCache(%s) return with error: %s", consts.GetDiskThrottlingKey, err)
×
382
                return false
×
383
        }
×
384
        return cache != nil
23✔
385
}
386

387
func (d *Driver) isCheckDiskLunThrottled(ctx context.Context) bool {
×
388
        cache, err := d.checkDiskLunThrottlingCache.Get(ctx, consts.CheckDiskLunThrottlingKey, azcache.CacheReadTypeDefault)
×
389
        if err != nil {
×
390
                klog.Warningf("throttlingCache(%s) return with error: %s", consts.CheckDiskLunThrottlingKey, err)
×
391
                return false
×
392
        }
×
393
        return cache != nil
×
394
}
395

396
func (d *Driver) checkDiskExists(ctx context.Context, diskURI string) (*armcompute.Disk, error) {
15✔
397
        if d.isGetDiskThrottled(ctx) {
16✔
398
                klog.Warningf("skip checkDiskExists(%s) since it's still in throttling", diskURI)
1✔
399
                return nil, nil
1✔
400
        }
1✔
401

402
        newCtx, cancel := context.WithTimeout(ctx, time.Duration(d.getDiskTimeoutInSeconds)*time.Second)
14✔
403
        defer cancel()
14✔
404
        return d.diskController.GetDiskByURI(newCtx, diskURI)
14✔
405
}
406

407
func (d *Driver) checkDiskCapacity(ctx context.Context, subsID, resourceGroup, diskName string, requestGiB int) (bool, error) {
3✔
408
        if d.isGetDiskThrottled(ctx) {
4✔
409
                klog.Warningf("skip checkDiskCapacity(%s, %s) since it's still in throttling", resourceGroup, diskName)
1✔
410
                return true, nil
1✔
411
        }
1✔
412
        disk, err := d.diskController.GetDisk(ctx, subsID, resourceGroup, diskName)
2✔
413
        // Because we can not judge the reason of the error. Maybe the disk does not exist.
2✔
414
        // So here we do not handle the error.
2✔
415
        if err == nil {
4✔
416
                if !reflect.DeepEqual(disk, armcompute.Disk{}) && disk.Properties.DiskSizeGB != nil && int(*disk.Properties.DiskSizeGB) != requestGiB {
3✔
417
                        return false, status.Errorf(codes.AlreadyExists, "the request volume already exists, but its capacity(%v) is different from (%v)", *disk.Properties.DiskSizeGB, requestGiB)
1✔
418
                }
1✔
419
        }
420
        return true, nil
1✔
421
}
422

423
func (d *Driver) getVolumeLocks() *volumehelper.VolumeLocks {
4✔
424
        return d.volumeLocks
4✔
425
}
4✔
426

427
// setControllerCapabilities sets the controller capabilities field. It is intended for use with unit tests.
428
func (d *DriverCore) setControllerCapabilities(caps []*csi.ControllerServiceCapability) {
3✔
429
        d.Cap = caps
3✔
430
}
3✔
431

432
// setNodeCapabilities sets the node capabilities field. It is intended for use with unit tests.
433
func (d *DriverCore) setNodeCapabilities(nodeCaps []*csi.NodeServiceCapability) {
1✔
434
        d.NSCap = nodeCaps
1✔
435
}
1✔
436

437
// setName sets the Name field. It is intended for use with unit tests.
438
func (d *DriverCore) setName(name string) {
1✔
439
        d.Name = name
1✔
440
}
1✔
441

442
// setName sets the NodeId field. It is intended for use with unit tests.
443
func (d *DriverCore) setNodeID(nodeID string) {
1✔
444
        d.NodeID = nodeID
1✔
445
}
1✔
446

447
// setName sets the Version field. It is intended for use with unit tests.
448
func (d *DriverCore) setVersion(version string) {
1✔
449
        d.Version = version
1✔
450
}
1✔
451

452
// getCloud returns the value of the cloud field. It is intended for use with unit tests.
453
func (d *DriverCore) getCloud() *azure.Cloud {
27✔
454
        return d.cloud
27✔
455
}
27✔
456

457
// setCloud sets the cloud field. It is intended for use with unit tests.
458
func (d *DriverCore) setCloud(cloud *azure.Cloud) {
22✔
459
        d.cloud = cloud
22✔
460
}
22✔
461

462
// getMounter returns the value of the mounter field. It is intended for use with unit tests.
463
func (d *DriverCore) getMounter() *mount.SafeFormatAndMount {
4✔
464
        return d.mounter
4✔
465
}
4✔
466

467
// setMounter sets the mounter field. It is intended for use with unit tests.
468
func (d *DriverCore) setMounter(mounter *mount.SafeFormatAndMount) {
28✔
469
        d.mounter = mounter
28✔
470
}
28✔
471

472
// getPerfOptimizationEnabled returns the value of the perfOptimizationEnabled field. It is intended for use with unit tests.
473
func (d *DriverCore) getPerfOptimizationEnabled() bool {
19✔
474
        return d.perfOptimizationEnabled
19✔
475
}
19✔
476

477
// setPerfOptimizationEnabled sets the value of the perfOptimizationEnabled field. It is intended for use with unit tests.
478
func (d *DriverCore) setPerfOptimizationEnabled(enabled bool) {
9✔
479
        d.perfOptimizationEnabled = enabled
9✔
480
}
9✔
481

482
// getDeviceHelper returns the value of the deviceHelper field. It is intended for use with unit tests.
483
func (d *DriverCore) getDeviceHelper() optimization.Interface {
8✔
484
        return d.deviceHelper
8✔
485
}
8✔
486

487
// getNodeInfo returns the value of the nodeInfo field. It is intended for use with unit tests.
488
func (d *DriverCore) getNodeInfo() *optimization.NodeInfo {
2✔
489
        return d.nodeInfo
2✔
490
}
2✔
491

492
func (d *DriverCore) getHostUtil() hostUtil {
11✔
493
        return d.hostUtil
11✔
494
}
11✔
495

496
// getSnapshotCompletionPercent returns the completion percent of snapshot
497
func (d *DriverCore) getSnapshotCompletionPercent(ctx context.Context, subsID, resourceGroup, snapshotName string) (float32, error) {
28✔
498
        snapshotClient, err := d.clientFactory.GetSnapshotClientForSub(subsID)
28✔
499
        if err != nil {
28✔
500
                return 0.0, err
×
501
        }
×
502
        copySnapshot, err := snapshotClient.Get(ctx, resourceGroup, snapshotName)
28✔
503
        if err != nil {
31✔
504
                return 0.0, err
3✔
505
        }
3✔
506

507
        if copySnapshot.Properties == nil || copySnapshot.Properties.CompletionPercent == nil {
38✔
508
                // If CompletionPercent is nil, it means the snapshot is complete
13✔
509
                klog.V(2).Infof("snapshot(%s) under rg(%s) has no SnapshotProperties or CompletionPercent is nil", snapshotName, resourceGroup)
13✔
510
                return 100.0, nil
13✔
511
        }
13✔
512

513
        return *copySnapshot.Properties.CompletionPercent, nil
12✔
514
}
515

516
// waitForSnapshotReady wait for completionPercent of snapshot is 100.0
517
func (d *DriverCore) waitForSnapshotReady(ctx context.Context, subsID, resourceGroup, snapshotName string, intervel, timeout time.Duration) error {
18✔
518
        completionPercent, err := d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
18✔
519
        if err != nil {
21✔
520
                return err
3✔
521
        }
3✔
522

523
        if completionPercent >= float32(100.0) {
29✔
524
                klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
14✔
525
                return nil
14✔
526
        }
14✔
527

528
        timeTick := time.Tick(intervel)
1✔
529
        timeAfter := time.After(timeout)
1✔
530
        for {
12✔
531
                select {
11✔
532
                case <-timeTick:
10✔
533
                        completionPercent, err = d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
10✔
534
                        if err != nil {
10✔
535
                                return err
×
536
                        }
×
537

538
                        if completionPercent >= float32(100.0) {
10✔
539
                                klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
×
540
                                return nil
×
541
                        }
×
542
                        klog.V(2).Infof("snapshot(%s) under rg(%s) completionPercent: %f", snapshotName, resourceGroup, completionPercent)
10✔
543
                case <-timeAfter:
1✔
544
                        return fmt.Errorf("timeout waiting for snapshot(%s) under rg(%s)", snapshotName, resourceGroup)
1✔
545
                }
546
        }
547
}
548

549
// getUsedLunsFromVolumeAttachments returns a list of used luns from VolumeAttachments
550
func (d *DriverCore) getUsedLunsFromVolumeAttachments(ctx context.Context, nodeName string) ([]int, error) {
1✔
551
        kubeClient := d.cloud.KubeClient
1✔
552
        if kubeClient == nil || kubeClient.StorageV1() == nil || kubeClient.StorageV1().VolumeAttachments() == nil {
2✔
553
                return nil, fmt.Errorf("kubeClient or kubeClient.StorageV1() or kubeClient.StorageV1().VolumeAttachments() is nil")
1✔
554
        }
1✔
555

556
        volumeAttachments, err := kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{
×
557
                TimeoutSeconds: ptr.To(int64(2))})
×
558
        if err != nil {
×
559
                return nil, err
×
560
        }
×
561

562
        usedLuns := make([]int, 0)
×
563
        if volumeAttachments == nil {
×
564
                klog.V(2).Infof("volumeAttachments is nil")
×
565
                return usedLuns, nil
×
566
        }
×
567

568
        klog.V(2).Infof("volumeAttachments count: %d, nodeName: %s", len(volumeAttachments.Items), nodeName)
×
569
        for _, va := range volumeAttachments.Items {
×
570
                klog.V(6).Infof("attacher: %s, nodeName: %s, Status: %v, PV: %s, attachmentMetadata: %v", va.Spec.Attacher, va.Spec.NodeName,
×
571
                        va.Status.Attached, ptr.Deref(va.Spec.Source.PersistentVolumeName, ""), va.Status.AttachmentMetadata)
×
572
                if va.Spec.Attacher == d.Name && strings.EqualFold(va.Spec.NodeName, nodeName) && va.Status.Attached {
×
573
                        if k, ok := va.Status.AttachmentMetadata[consts.LUN]; ok {
×
574
                                lun, err := strconv.Atoi(k)
×
575
                                if err != nil {
×
576
                                        klog.Warningf("VolumeAttachment(%s) lun(%s) is not a valid integer", va.Name, k)
×
577
                                        continue
×
578
                                }
579
                                usedLuns = append(usedLuns, lun)
×
580
                        }
581
                }
582
        }
583
        return usedLuns, nil
×
584
}
585

586
// getUsedLunsFromNode returns a list of sorted used luns from Node
587
func (d *DriverCore) getUsedLunsFromNode(ctx context.Context, nodeName types.NodeName) ([]int, error) {
1✔
588
        disks, _, err := d.diskController.GetNodeDataDisks(ctx, nodeName, azcache.CacheReadTypeDefault)
1✔
589
        if err != nil {
1✔
590
                klog.Errorf("error of getting data disks for node %s: %v", nodeName, err)
×
591
                return nil, err
×
592
        }
×
593

594
        usedLuns := make([]int, 0)
1✔
595
        // get all disks attached to the node
1✔
596
        for _, disk := range disks {
3✔
597
                if disk.Lun == nil {
2✔
598
                        klog.Warningf("disk(%s) lun is nil", *disk.Name)
×
599
                        continue
×
600
                }
601
                usedLuns = append(usedLuns, int(*disk.Lun))
2✔
602
        }
603
        return usedLuns, nil
1✔
604
}
605

606
// getNodeInfoFromLabels get zone, instanceType from node labels
607
func getNodeInfoFromLabels(ctx context.Context, nodeName string, kubeClient clientset.Interface) (string, string, error) {
2✔
608
        if kubeClient == nil || kubeClient.CoreV1() == nil {
4✔
609
                return "", "", fmt.Errorf("kubeClient is nil")
2✔
610
        }
2✔
611

612
        node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
×
613
        if err != nil {
×
614
                return "", "", fmt.Errorf("get node(%s) failed with %v", nodeName, err)
×
615
        }
×
616

617
        if len(node.Labels) == 0 {
×
618
                return "", "", fmt.Errorf("node(%s) label is empty", nodeName)
×
619
        }
×
620
        return node.Labels[consts.WellKnownTopologyKey], node.Labels[consts.InstanceTypeKey], nil
×
621
}
622

623
// getDefaultDiskIOPSReadWrite according to requestGiB
624
//
625
//        ref: https://docs.microsoft.com/en-us/azure/virtual-machines/disks-types#ultra-disk-iops
626
func getDefaultDiskIOPSReadWrite(requestGiB int) int {
9✔
627
        iops := azurecloudconsts.DefaultDiskIOPSReadWrite
9✔
628
        if requestGiB > iops {
16✔
629
                iops = requestGiB
7✔
630
        }
7✔
631
        if iops > 160000 {
12✔
632
                iops = 160000
3✔
633
        }
3✔
634
        return iops
9✔
635
}
636

637
// getDefaultDiskMBPSReadWrite according to requestGiB
638
//
639
//        ref: https://docs.microsoft.com/en-us/azure/virtual-machines/disks-types#ultra-disk-throughput
640
func getDefaultDiskMBPSReadWrite(requestGiB int) int {
6✔
641
        bandwidth := azurecloudconsts.DefaultDiskMBpsReadWrite
6✔
642
        iops := getDefaultDiskIOPSReadWrite(requestGiB)
6✔
643
        if iops/256 > bandwidth {
10✔
644
                bandwidth = int(volumehelper.RoundUpSize(int64(iops), 256))
4✔
645
        }
4✔
646
        if bandwidth > iops/4 {
6✔
647
                bandwidth = int(volumehelper.RoundUpSize(int64(iops), 4))
×
648
        }
×
649
        if bandwidth > 4000 {
6✔
650
                bandwidth = 4000
×
651
        }
×
652
        return bandwidth
6✔
653
}
654

655
// getVMSSInstanceName get instance name from vmss compute name, e.g. "aks-agentpool-20657377-vmss_2" -> "aks-agentpool-20657377-vmss000002"
656
func getVMSSInstanceName(computeName string) (string, error) {
5✔
657
        names := strings.Split(computeName, "_")
5✔
658
        if len(names) != 2 {
6✔
659
                return "", fmt.Errorf("invalid vmss compute name: %s", computeName)
1✔
660
        }
1✔
661

662
        instanceID, err := strconv.Atoi(names[1])
4✔
663
        if err != nil {
5✔
664
                return "", fmt.Errorf("parsing vmss compute name(%s) failed with %v", computeName, err)
1✔
665
        }
1✔
666
        return fmt.Sprintf("%s%06s", names[0], strconv.FormatInt(int64(instanceID), 36)), nil
3✔
667
}
668

669
// Struct for JSON patch operations
670
type JSONPatch struct {
671
        OP    string      `json:"op,omitempty"`
672
        Path  string      `json:"path,omitempty"`
673
        Value interface{} `json:"value"`
674
}
675

676
// removeTaintInBackground is a goroutine that retries removeNotReadyTaint with exponential backoff
677
func removeTaintInBackground(k8sClient kubernetes.Interface, nodeName, driverName string, backoff wait.Backoff, removalFunc func(kubernetes.Interface, string, string) error) {
×
678
        backoffErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
×
679
                err := removalFunc(k8sClient, nodeName, driverName)
×
680
                if err != nil {
×
681
                        klog.ErrorS(err, "Unexpected failure when attempting to remove node taint(s)")
×
682
                        return false, nil
×
683
                }
×
684
                return true, nil
×
685
        })
686

687
        if backoffErr != nil {
×
688
                klog.ErrorS(backoffErr, "Retries exhausted, giving up attempting to remove node taint(s)")
×
689
        }
×
690
}
691

692
// removeNotReadyTaint removes the taint disk.csi.azure.com/agent-not-ready from the local node
693
// This taint can be optionally applied by users to prevent startup race conditions such as
694
// https://github.com/kubernetes/kubernetes/issues/95911
695
func removeNotReadyTaint(clientset kubernetes.Interface, nodeName, driverName string) error {
×
696
        ctx := context.Background()
×
697
        node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
×
698
        if err != nil {
×
699
                return err
×
700
        }
×
701

702
        if err := checkAllocatable(ctx, clientset, nodeName, driverName); err != nil {
×
703
                return err
×
704
        }
×
705

706
        taintKeyToRemove := driverName + consts.AgentNotReadyNodeTaintKeySuffix
×
707
        klog.V(2).Infof("removing taint with key %s from local node %s", taintKeyToRemove, nodeName)
×
708
        var taintsToKeep []corev1.Taint
×
709
        for _, taint := range node.Spec.Taints {
×
710
                klog.V(5).Infof("checking taint key %s, value %s, effect %s", taint.Key, taint.Value, taint.Effect)
×
711
                if taint.Key != taintKeyToRemove {
×
712
                        taintsToKeep = append(taintsToKeep, taint)
×
713
                } else {
×
714
                        klog.V(2).Infof("queued taint for removal with key %s, effect %s", taint.Key, taint.Effect)
×
715
                }
×
716
        }
717

718
        if len(taintsToKeep) == len(node.Spec.Taints) {
×
719
                klog.V(2).Infof("No taints to remove on node, skipping taint removal")
×
720
                return nil
×
721
        }
×
722

723
        patchRemoveTaints := []JSONPatch{
×
724
                {
×
725
                        OP:    "test",
×
726
                        Path:  "/spec/taints",
×
727
                        Value: node.Spec.Taints,
×
728
                },
×
729
                {
×
730
                        OP:    "replace",
×
731
                        Path:  "/spec/taints",
×
732
                        Value: taintsToKeep,
×
733
                },
×
734
        }
×
735

×
736
        patch, err := json.Marshal(patchRemoveTaints)
×
737
        if err != nil {
×
738
                return err
×
739
        }
×
740

741
        _, err = clientset.CoreV1().Nodes().Patch(ctx, nodeName, k8stypes.JSONPatchType, patch, metav1.PatchOptions{})
×
742
        if err != nil {
×
743
                return err
×
744
        }
×
745
        klog.V(2).Infof("removed taint with key %s from local node %s successfully", taintKeyToRemove, nodeName)
×
746
        return nil
×
747
}
748

749
func checkAllocatable(ctx context.Context, clientset kubernetes.Interface, nodeName, driverName string) error {
×
750
        csiNode, err := clientset.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
×
751
        if err != nil {
×
752
                return fmt.Errorf("isAllocatableSet: failed to get CSINode for %s: %w", nodeName, err)
×
753
        }
×
754

755
        for _, driver := range csiNode.Spec.Drivers {
×
756
                if driver.Name == driverName {
×
757
                        if driver.Allocatable != nil && driver.Allocatable.Count != nil {
×
758
                                klog.V(2).Infof("CSINode Allocatable value is set for driver on node %s, count %d", nodeName, *driver.Allocatable.Count)
×
759
                                return nil
×
760
                        }
×
761
                        return fmt.Errorf("isAllocatableSet: allocatable value not set for driver on node %s", nodeName)
×
762
                }
763
        }
764

765
        return fmt.Errorf("isAllocatableSet: driver not found on node %s", nodeName)
×
766
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc