• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / azuredisk-csi-driver / 13044734068

30 Jan 2025 02:52AM UTC coverage: 76.873% (+0.06%) from 76.813%
13044734068

Pull #2834

github

andyzhangx
chore: upgrade CSI driver sidecar image versions
Pull Request #2834: chore: upgrade CSI driver sidecar image versions

3879 of 5046 relevant lines covered (76.87%)

6.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.13
/pkg/azuredisk/azuredisk.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package azuredisk
18

19
import (
20
        "context"
21
        "encoding/json"
22
        "errors"
23
        "fmt"
24
        "reflect"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/compute/armcompute/v6"
30
        "github.com/container-storage-interface/spec/lib/go/csi"
31
        "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
32

33
        "google.golang.org/grpc"
34
        "google.golang.org/grpc/codes"
35
        "google.golang.org/grpc/status"
36

37
        grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
38
        corev1 "k8s.io/api/core/v1"
39
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40
        "k8s.io/apimachinery/pkg/types"
41
        k8stypes "k8s.io/apimachinery/pkg/types"
42
        "k8s.io/apimachinery/pkg/util/wait"
43
        "k8s.io/client-go/kubernetes"
44
        clientset "k8s.io/client-go/kubernetes"
45
        "k8s.io/klog/v2"
46
        "k8s.io/kubernetes/pkg/volume/util/hostutil"
47
        "k8s.io/mount-utils"
48
        "k8s.io/utils/ptr"
49

50
        consts "sigs.k8s.io/azuredisk-csi-driver/pkg/azureconstants"
51
        "sigs.k8s.io/azuredisk-csi-driver/pkg/azureutils"
52
        csicommon "sigs.k8s.io/azuredisk-csi-driver/pkg/csi-common"
53
        "sigs.k8s.io/azuredisk-csi-driver/pkg/mounter"
54
        "sigs.k8s.io/azuredisk-csi-driver/pkg/optimization"
55
        volumehelper "sigs.k8s.io/azuredisk-csi-driver/pkg/util"
56
        "sigs.k8s.io/cloud-provider-azure/pkg/azclient"
57
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
58
        azurecloudconsts "sigs.k8s.io/cloud-provider-azure/pkg/consts"
59
        azure "sigs.k8s.io/cloud-provider-azure/pkg/provider"
60
)
61

62
var (
63
        // taintRemovalInitialDelay is the initial delay for node taint removal
64
        taintRemovalInitialDelay = 1 * time.Second
65
        // taintRemovalBackoff is the exponential backoff configuration for node taint removal
66
        taintRemovalBackoff = wait.Backoff{
67
                Duration: 500 * time.Millisecond,
68
                Factor:   2,
69
                Steps:    10, // Max delay = 0.5 * 2^9 = ~4 minutes
70
        }
71
)
72

73
// CSIDriver defines the interface for a CSI driver.
74
type CSIDriver interface {
75
        csi.ControllerServer
76
        csi.NodeServer
77
        csi.IdentityServer
78

79
        Run(ctx context.Context) error
80
}
81

82
type hostUtil interface {
83
        PathIsDevice(string) (bool, error)
84
}
85

86
// DriverCore contains fields common to both the V1 and V2 driver, and implements all interfaces of CSI drivers
87
type DriverCore struct {
88
        csicommon.CSIDriver
89
        // Embed UnimplementedXXXServer to ensure the driver returns Unimplemented for any
90
        // new RPC methods that might be introduced in future versions of the spec.
91
        csi.UnimplementedControllerServer
92
        csi.UnimplementedIdentityServer
93
        csi.UnimplementedNodeServer
94

95
        perfOptimizationEnabled      bool
96
        cloudConfigSecretName        string
97
        cloudConfigSecretNamespace   string
98
        customUserAgent              string
99
        userAgentSuffix              string
100
        cloud                        *azure.Cloud
101
        clientFactory                azclient.ClientFactory
102
        diskController               *ManagedDiskController
103
        mounter                      *mount.SafeFormatAndMount
104
        deviceHelper                 optimization.Interface
105
        nodeInfo                     *optimization.NodeInfo
106
        ioHandler                    azureutils.IOHandler
107
        hostUtil                     hostUtil
108
        useCSIProxyGAInterface       bool
109
        enableDiskOnlineResize       bool
110
        allowEmptyCloudConfig        bool
111
        enableListVolumes            bool
112
        enableListSnapshots          bool
113
        supportZone                  bool
114
        getNodeInfoFromLabels        bool
115
        enableDiskCapacityCheck      bool
116
        disableUpdateCache           bool
117
        enableTrafficManager         bool
118
        trafficManagerPort           int64
119
        vmssCacheTTLInSeconds        int64
120
        volStatsCacheExpireInMinutes int64
121
        attachDetachInitialDelayInMs int64
122
        vmType                       string
123
        enableWindowsHostProcess     bool
124
        listDisksUsingWinCIM         bool
125
        getNodeIDFromIMDS            bool
126
        enableOtelTracing            bool
127
        shouldWaitForSnapshotReady   bool
128
        checkDiskLUNCollision        bool
129
        forceDetachBackoff           bool
130
        endpoint                     string
131
        disableAVSetNodes            bool
132
        removeNotReadyTaint          bool
133
        kubeClient                   kubernetes.Interface
134
        // a timed cache storing volume stats <volumeID, volumeStats>
135
        volStatsCache           azcache.Resource
136
        maxConcurrentFormat     int64
137
        concurrentFormatTimeout int64
138
}
139

140
// Driver is the v1 implementation of the Azure Disk CSI Driver.
141
type Driver struct {
142
        DriverCore
143
        volumeLocks *volumehelper.VolumeLocks
144
        // a timed cache for throttling
145
        throttlingCache azcache.Resource
146
        // a timed cache for disk lun collision check throttling
147
        checkDiskLunThrottlingCache azcache.Resource
148
}
149

150
// newDriverV1 Creates a NewCSIDriver object. Assumes vendor version is equal to driver version &
151
// does not support optional driver plugin info manifest field. Refer to CSI spec for more details.
152
func newDriverV1(options *DriverOptions) *Driver {
3✔
153
        driver := Driver{}
3✔
154
        driver.Name = options.DriverName
3✔
155
        driver.Version = driverVersion
3✔
156
        driver.NodeID = options.NodeID
3✔
157
        driver.VolumeAttachLimit = options.VolumeAttachLimit
3✔
158
        driver.ReservedDataDiskSlotNum = options.ReservedDataDiskSlotNum
3✔
159
        driver.perfOptimizationEnabled = options.EnablePerfOptimization
3✔
160
        driver.cloudConfigSecretName = options.CloudConfigSecretName
3✔
161
        driver.cloudConfigSecretNamespace = options.CloudConfigSecretNamespace
3✔
162
        driver.customUserAgent = options.CustomUserAgent
3✔
163
        driver.userAgentSuffix = options.UserAgentSuffix
3✔
164
        driver.useCSIProxyGAInterface = options.UseCSIProxyGAInterface
3✔
165
        driver.enableDiskOnlineResize = options.EnableDiskOnlineResize
3✔
166
        driver.allowEmptyCloudConfig = options.AllowEmptyCloudConfig
3✔
167
        driver.enableListVolumes = options.EnableListVolumes
3✔
168
        driver.enableListSnapshots = options.EnableListVolumes
3✔
169
        driver.supportZone = options.SupportZone
3✔
170
        driver.getNodeInfoFromLabels = options.GetNodeInfoFromLabels
3✔
171
        driver.enableDiskCapacityCheck = options.EnableDiskCapacityCheck
3✔
172
        driver.disableUpdateCache = options.DisableUpdateCache
3✔
173
        driver.attachDetachInitialDelayInMs = options.AttachDetachInitialDelayInMs
3✔
174
        driver.enableTrafficManager = options.EnableTrafficManager
3✔
175
        driver.trafficManagerPort = options.TrafficManagerPort
3✔
176
        driver.vmssCacheTTLInSeconds = options.VMSSCacheTTLInSeconds
3✔
177
        driver.volStatsCacheExpireInMinutes = options.VolStatsCacheExpireInMinutes
3✔
178
        driver.vmType = options.VMType
3✔
179
        driver.enableWindowsHostProcess = options.EnableWindowsHostProcess
3✔
180
        driver.listDisksUsingWinCIM = options.ListDisksUsingWinCIM
3✔
181
        driver.getNodeIDFromIMDS = options.GetNodeIDFromIMDS
3✔
182
        driver.enableOtelTracing = options.EnableOtelTracing
3✔
183
        driver.shouldWaitForSnapshotReady = options.WaitForSnapshotReady
3✔
184
        driver.checkDiskLUNCollision = options.CheckDiskLUNCollision
3✔
185
        driver.forceDetachBackoff = options.ForceDetachBackoff
3✔
186
        driver.endpoint = options.Endpoint
3✔
187
        driver.disableAVSetNodes = options.DisableAVSetNodes
3✔
188
        driver.removeNotReadyTaint = options.RemoveNotReadyTaint
3✔
189
        driver.maxConcurrentFormat = options.MaxConcurrentFormat
3✔
190
        driver.concurrentFormatTimeout = options.ConcurrentFormatTimeout
3✔
191
        driver.volumeLocks = volumehelper.NewVolumeLocks()
3✔
192
        driver.ioHandler = azureutils.NewOSIOHandler()
3✔
193
        driver.hostUtil = hostutil.NewHostUtil()
3✔
194
        if driver.NodeID == "" {
6✔
195
                // nodeid is not needed in controller component
3✔
196
                klog.Warning("nodeid is empty")
3✔
197
        }
3✔
198
        topologyKey = fmt.Sprintf("topology.%s/zone", driver.Name)
3✔
199

3✔
200
        getter := func(_ context.Context, _ string) (interface{}, error) { return nil, nil }
3✔
201
        var err error
3✔
202
        if driver.throttlingCache, err = azcache.NewTimedCache(5*time.Minute, getter, false); err != nil {
3✔
203
                klog.Fatalf("%v", err)
×
204
        }
×
205
        if driver.checkDiskLunThrottlingCache, err = azcache.NewTimedCache(30*time.Minute, getter, false); err != nil {
3✔
206
                klog.Fatalf("%v", err)
×
207
        }
×
208

209
        if options.VolStatsCacheExpireInMinutes <= 0 {
6✔
210
                options.VolStatsCacheExpireInMinutes = 10 // default expire in 10 minutes
3✔
211
        }
3✔
212
        if driver.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
3✔
213
                klog.Fatalf("%v", err)
×
214
        }
×
215

216
        userAgent := GetUserAgent(driver.Name, driver.customUserAgent, driver.userAgentSuffix)
3✔
217
        klog.V(2).Infof("driver userAgent: %s", userAgent)
3✔
218

3✔
219
        kubeClient, err := azureutils.GetKubeClient(options.Kubeconfig)
3✔
220
        if err != nil {
6✔
221
                klog.Warningf("get kubeconfig(%s) failed with error: %v", options.Kubeconfig, err)
3✔
222
        }
3✔
223
        driver.kubeClient = kubeClient
3✔
224

3✔
225
        cloud, err := azureutils.GetCloudProviderFromClient(context.Background(), kubeClient, driver.cloudConfigSecretName, driver.cloudConfigSecretNamespace,
3✔
226
                userAgent, driver.allowEmptyCloudConfig, driver.enableTrafficManager, driver.trafficManagerPort)
3✔
227
        if err != nil {
3✔
228
                klog.Fatalf("failed to get Azure Cloud Provider, error: %v", err)
×
229
        }
×
230
        driver.cloud = cloud
3✔
231

3✔
232
        if driver.cloud != nil {
6✔
233
                driver.clientFactory = driver.cloud.ComputeClientFactory
3✔
234
                if driver.vmType != "" {
5✔
235
                        klog.V(2).Infof("override VMType(%s) in cloud config as %s", driver.cloud.VMType, driver.vmType)
2✔
236
                        driver.cloud.VMType = driver.vmType
2✔
237
                }
2✔
238

239
                if driver.NodeID == "" {
6✔
240
                        // Disable UseInstanceMetadata for controller to mitigate a timeout issue using IMDS
3✔
241
                        // https://github.com/kubernetes-sigs/azuredisk-csi-driver/issues/168
3✔
242
                        klog.V(2).Infof("disable UseInstanceMetadata for controller")
3✔
243
                        driver.cloud.Config.UseInstanceMetadata = false
3✔
244

3✔
245
                        if driver.cloud.VMType == azurecloudconsts.VMTypeStandard && driver.cloud.DisableAvailabilitySetNodes {
3✔
246
                                klog.V(2).Infof("set DisableAvailabilitySetNodes as false since VMType is %s", driver.cloud.VMType)
×
247
                                driver.cloud.DisableAvailabilitySetNodes = false
×
248
                        }
×
249

250
                        if driver.cloud.VMType == azurecloudconsts.VMTypeVMSS && !driver.cloud.DisableAvailabilitySetNodes && driver.disableAVSetNodes {
3✔
251
                                klog.V(2).Infof("DisableAvailabilitySetNodes for controller since current VMType is vmss")
×
252
                                driver.cloud.DisableAvailabilitySetNodes = true
×
253
                        }
×
254
                        klog.V(2).Infof("cloud: %s, location: %s, rg: %s, VMType: %s, PrimaryScaleSetName: %s, PrimaryAvailabilitySetName: %s, DisableAvailabilitySetNodes: %v", driver.cloud.Cloud, driver.cloud.Location, driver.cloud.ResourceGroup, driver.cloud.VMType, driver.cloud.PrimaryScaleSetName, driver.cloud.PrimaryAvailabilitySetName, driver.cloud.DisableAvailabilitySetNodes)
3✔
255
                }
256

257
                if driver.vmssCacheTTLInSeconds > 0 {
5✔
258
                        klog.V(2).Infof("reset vmssCacheTTLInSeconds as %d", driver.vmssCacheTTLInSeconds)
2✔
259
                        driver.cloud.VMCacheTTLInSeconds = int(driver.vmssCacheTTLInSeconds)
2✔
260
                        driver.cloud.VmssCacheTTLInSeconds = int(driver.vmssCacheTTLInSeconds)
2✔
261
                }
2✔
262

263
                driver.diskController = NewManagedDiskController(driver.cloud)
3✔
264
                driver.diskController.DisableUpdateCache = driver.disableUpdateCache
3✔
265
                driver.diskController.AttachDetachInitialDelayInMs = int(driver.attachDetachInitialDelayInMs)
3✔
266
                driver.diskController.ForceDetachBackoff = driver.forceDetachBackoff
3✔
267
        }
268

269
        driver.deviceHelper = optimization.NewSafeDeviceHelper()
3✔
270

3✔
271
        if driver.getPerfOptimizationEnabled() {
5✔
272
                driver.nodeInfo, err = optimization.NewNodeInfo(context.TODO(), driver.getCloud(), driver.NodeID)
2✔
273
                if err != nil {
4✔
274
                        klog.Warningf("Failed to get node info. Error: %v", err)
2✔
275
                }
2✔
276
        }
277

278
        driver.mounter, err = mounter.NewSafeMounter(driver.enableWindowsHostProcess, driver.listDisksUsingWinCIM, driver.useCSIProxyGAInterface, int(driver.maxConcurrentFormat), time.Duration(driver.concurrentFormatTimeout)*time.Second)
3✔
279
        if err != nil {
3✔
280
                klog.Fatalf("Failed to get safe mounter. Error: %v", err)
×
281
        }
×
282

283
        controllerCap := []csi.ControllerServiceCapability_RPC_Type{
3✔
284
                csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
3✔
285
                csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
3✔
286
                csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
3✔
287
                csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
3✔
288
                csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
3✔
289
                csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
3✔
290
                csi.ControllerServiceCapability_RPC_MODIFY_VOLUME,
3✔
291
        }
3✔
292
        if driver.enableListVolumes {
5✔
293
                controllerCap = append(controllerCap, csi.ControllerServiceCapability_RPC_LIST_VOLUMES, csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES)
2✔
294
        }
2✔
295
        if driver.enableListSnapshots {
5✔
296
                controllerCap = append(controllerCap, csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS)
2✔
297
        }
2✔
298

299
        driver.AddControllerServiceCapabilities(controllerCap)
3✔
300
        driver.AddVolumeCapabilityAccessModes(
3✔
301
                []csi.VolumeCapability_AccessMode_Mode{
3✔
302
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
3✔
303
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
3✔
304
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
3✔
305
                        csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
3✔
306
                })
3✔
307
        driver.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
3✔
308
                csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
3✔
309
                csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
3✔
310
                csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
3✔
311
                csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
3✔
312
        })
3✔
313

3✔
314
        if kubeClient != nil && driver.removeNotReadyTaint && driver.NodeID != "" {
3✔
315
                // Remove taint from node to indicate driver startup success
×
316
                // This is done at the last possible moment to prevent race conditions or false positive removals
×
317
                time.AfterFunc(taintRemovalInitialDelay, func() {
×
318
                        removeTaintInBackground(kubeClient, driver.NodeID, driver.Name, taintRemovalBackoff, removeNotReadyTaint)
×
319
                })
×
320
        }
321
        return &driver
3✔
322
}
323

324
// Run driver initialization
325
func (d *Driver) Run(ctx context.Context) error {
5✔
326
        versionMeta, err := GetVersionYAML(d.Name)
5✔
327
        if err != nil {
5✔
328
                klog.Fatalf("%v", err)
×
329
        }
×
330
        klog.Infof("\nDRIVER INFORMATION:\n-------------------\n%s\n\nStreaming logs below:", versionMeta)
5✔
331

5✔
332
        opts := []grpc.ServerOption{
5✔
333
                grpc.ChainUnaryInterceptor(
5✔
334
                        grpcprom.NewServerMetrics().UnaryServerInterceptor(),
5✔
335
                        csicommon.LogGRPC,
5✔
336
                ),
5✔
337
        }
5✔
338
        if d.enableOtelTracing {
5✔
339
                exporter, err := InitOtelTracing()
×
340
                if err != nil {
×
341
                        klog.Fatalf("Failed to initialize otel tracing: %v", err)
×
342
                }
×
343
                // Exporter will flush traces on shutdown
344
                defer func() {
×
345
                        if err := exporter.Shutdown(context.Background()); err != nil {
×
346
                                klog.Errorf("Could not shutdown otel exporter: %v", err)
×
347
                        }
×
348
                }()
349
                opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler()))
×
350
        }
351

352
        s := grpc.NewServer(opts...)
5✔
353
        csi.RegisterIdentityServer(s, d)
5✔
354
        csi.RegisterControllerServer(s, d)
5✔
355
        csi.RegisterNodeServer(s, d)
5✔
356

5✔
357
        go func() {
10✔
358
                //graceful shutdown
5✔
359
                <-ctx.Done()
5✔
360
                s.GracefulStop()
5✔
361
        }()
5✔
362
        // Driver d act as IdentityServer, ControllerServer and NodeServer
363
        listener, err := csicommon.Listen(ctx, d.endpoint)
5✔
364
        if err != nil {
5✔
365
                klog.Fatalf("failed to listen to endpoint, error: %v", err)
×
366
        }
×
367
        err = s.Serve(listener)
5✔
368
        if errors.Is(err, grpc.ErrServerStopped) {
6✔
369
                klog.Infof("gRPC server stopped serving")
1✔
370
                return nil
1✔
371
        }
1✔
372
        return err
4✔
373
}
374

375
func (d *Driver) isGetDiskThrottled() bool {
18✔
376
        cache, err := d.throttlingCache.Get(context.Background(), consts.GetDiskThrottlingKey, azcache.CacheReadTypeDefault)
18✔
377
        if err != nil {
18✔
378
                klog.Warningf("throttlingCache(%s) return with error: %s", consts.GetDiskThrottlingKey, err)
×
379
                return false
×
380
        }
×
381
        return cache != nil
18✔
382
}
383

384
func (d *Driver) isCheckDiskLunThrottled() bool {
×
385
        cache, err := d.checkDiskLunThrottlingCache.Get(context.Background(), consts.CheckDiskLunThrottlingKey, azcache.CacheReadTypeDefault)
×
386
        if err != nil {
×
387
                klog.Warningf("throttlingCache(%s) return with error: %s", consts.CheckDiskLunThrottlingKey, err)
×
388
                return false
×
389
        }
×
390
        return cache != nil
×
391
}
392

393
func (d *Driver) checkDiskExists(ctx context.Context, diskURI string) (*armcompute.Disk, error) {
13✔
394
        diskName, err := azureutils.GetDiskName(diskURI)
13✔
395
        if err != nil {
16✔
396
                return nil, err
3✔
397
        }
3✔
398

399
        resourceGroup, err := azureutils.GetResourceGroupFromURI(diskURI)
10✔
400
        if err != nil {
10✔
401
                return nil, err
×
402
        }
×
403

404
        if d.isGetDiskThrottled() {
11✔
405
                klog.Warningf("skip checkDiskExists(%s) since it's still in throttling", diskURI)
1✔
406
                return nil, nil
1✔
407
        }
1✔
408
        subsID := azureutils.GetSubscriptionIDFromURI(diskURI)
9✔
409
        diskClient, err := d.diskController.clientFactory.GetDiskClientForSub(subsID)
9✔
410
        if err != nil {
9✔
411
                return nil, err
×
412
        }
×
413

414
        // get disk operation should timeout within 1min if it takes too long time
415
        newCtx, cancel := context.WithTimeout(ctx, time.Minute)
9✔
416
        defer cancel()
9✔
417

9✔
418
        disk, err := diskClient.Get(newCtx, resourceGroup, diskName)
9✔
419
        if err != nil {
9✔
420
                return nil, err
×
421
        }
×
422
        return disk, nil
9✔
423
}
424

425
func (d *Driver) checkDiskCapacity(ctx context.Context, subsID, resourceGroup, diskName string, requestGiB int) (bool, error) {
3✔
426
        if d.isGetDiskThrottled() {
4✔
427
                klog.Warningf("skip checkDiskCapacity(%s, %s) since it's still in throttling", resourceGroup, diskName)
1✔
428
                return true, nil
1✔
429
        }
1✔
430
        diskClient, err := d.clientFactory.GetDiskClientForSub(subsID)
2✔
431
        if err != nil {
2✔
432
                return false, err
×
433
        }
×
434
        disk, err := diskClient.Get(ctx, resourceGroup, diskName)
2✔
435
        // Because we can not judge the reason of the error. Maybe the disk does not exist.
2✔
436
        // So here we do not handle the error.
2✔
437
        if err == nil {
4✔
438
                if !reflect.DeepEqual(disk, armcompute.Disk{}) && disk.Properties.DiskSizeGB != nil && int(*disk.Properties.DiskSizeGB) != requestGiB {
3✔
439
                        return false, status.Errorf(codes.AlreadyExists, "the request volume already exists, but its capacity(%v) is different from (%v)", *disk.Properties.DiskSizeGB, requestGiB)
1✔
440
                }
1✔
441
        }
442
        return true, nil
1✔
443
}
444

445
func (d *Driver) getVolumeLocks() *volumehelper.VolumeLocks {
4✔
446
        return d.volumeLocks
4✔
447
}
4✔
448

449
// setControllerCapabilities sets the controller capabilities field. It is intended for use with unit tests.
450
func (d *DriverCore) setControllerCapabilities(caps []*csi.ControllerServiceCapability) {
3✔
451
        d.Cap = caps
3✔
452
}
3✔
453

454
// setNodeCapabilities sets the node capabilities field. It is intended for use with unit tests.
455
func (d *DriverCore) setNodeCapabilities(nodeCaps []*csi.NodeServiceCapability) {
1✔
456
        d.NSCap = nodeCaps
1✔
457
}
1✔
458

459
// setName sets the Name field. It is intended for use with unit tests.
460
func (d *DriverCore) setName(name string) {
1✔
461
        d.Name = name
1✔
462
}
1✔
463

464
// setName sets the NodeId field. It is intended for use with unit tests.
465
func (d *DriverCore) setNodeID(nodeID string) {
1✔
466
        d.NodeID = nodeID
1✔
467
}
1✔
468

469
// setName sets the Version field. It is intended for use with unit tests.
470
func (d *DriverCore) setVersion(version string) {
1✔
471
        d.Version = version
1✔
472
}
1✔
473

474
// getCloud returns the value of the cloud field. It is intended for use with unit tests.
475
func (d *DriverCore) getCloud() *azure.Cloud {
43✔
476
        return d.cloud
43✔
477
}
43✔
478

479
// setCloud sets the cloud field. It is intended for use with unit tests.
480
func (d *DriverCore) setCloud(cloud *azure.Cloud) {
11✔
481
        d.cloud = cloud
11✔
482
}
11✔
483

484
// getMounter returns the value of the mounter field. It is intended for use with unit tests.
485
func (d *DriverCore) getMounter() *mount.SafeFormatAndMount {
4✔
486
        return d.mounter
4✔
487
}
4✔
488

489
// setMounter sets the mounter field. It is intended for use with unit tests.
490
func (d *DriverCore) setMounter(mounter *mount.SafeFormatAndMount) {
28✔
491
        d.mounter = mounter
28✔
492
}
28✔
493

494
// getPerfOptimizationEnabled returns the value of the perfOptimizationEnabled field. It is intended for use with unit tests.
495
func (d *DriverCore) getPerfOptimizationEnabled() bool {
18✔
496
        return d.perfOptimizationEnabled
18✔
497
}
18✔
498

499
// setPerfOptimizationEnabled sets the value of the perfOptimizationEnabled field. It is intended for use with unit tests.
500
func (d *DriverCore) setPerfOptimizationEnabled(enabled bool) {
9✔
501
        d.perfOptimizationEnabled = enabled
9✔
502
}
9✔
503

504
// getDeviceHelper returns the value of the deviceHelper field. It is intended for use with unit tests.
505
func (d *DriverCore) getDeviceHelper() optimization.Interface {
8✔
506
        return d.deviceHelper
8✔
507
}
8✔
508

509
// getNodeInfo returns the value of the nodeInfo field. It is intended for use with unit tests.
510
func (d *DriverCore) getNodeInfo() *optimization.NodeInfo {
2✔
511
        return d.nodeInfo
2✔
512
}
2✔
513

514
func (d *DriverCore) getHostUtil() hostUtil {
11✔
515
        return d.hostUtil
11✔
516
}
11✔
517

518
// getSnapshotCompletionPercent returns the completion percent of snapshot
519
func (d *DriverCore) getSnapshotCompletionPercent(ctx context.Context, subsID, resourceGroup, snapshotName string) (float32, error) {
15✔
520
        snapshotClient, err := d.clientFactory.GetSnapshotClientForSub(subsID)
15✔
521
        if err != nil {
15✔
522
                return 0.0, err
×
523
        }
×
524
        copySnapshot, err := snapshotClient.Get(ctx, resourceGroup, snapshotName)
15✔
525
        if err != nil {
17✔
526
                return 0.0, err
2✔
527
        }
2✔
528

529
        if copySnapshot.Properties == nil || copySnapshot.Properties.CompletionPercent == nil {
14✔
530
                // If CompletionPercent is nil, it means the snapshot is complete
1✔
531
                klog.V(2).Infof("snapshot(%s) under rg(%s) has no SnapshotProperties or CompletionPercent is nil", snapshotName, resourceGroup)
1✔
532
                return 100.0, nil
1✔
533
        }
1✔
534

535
        return *copySnapshot.Properties.CompletionPercent, nil
12✔
536
}
537

538
// waitForSnapshotReady wait for completionPercent of snapshot is 100.0
539
func (d *DriverCore) waitForSnapshotReady(ctx context.Context, subsID, resourceGroup, snapshotName string, intervel, timeout time.Duration) error {
5✔
540
        completionPercent, err := d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
5✔
541
        if err != nil {
7✔
542
                return err
2✔
543
        }
2✔
544

545
        if completionPercent >= float32(100.0) {
5✔
546
                klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
2✔
547
                return nil
2✔
548
        }
2✔
549

550
        timeTick := time.Tick(intervel)
1✔
551
        timeAfter := time.After(timeout)
1✔
552
        for {
12✔
553
                select {
11✔
554
                case <-timeTick:
10✔
555
                        completionPercent, err = d.getSnapshotCompletionPercent(ctx, subsID, resourceGroup, snapshotName)
10✔
556
                        if err != nil {
10✔
557
                                return err
×
558
                        }
×
559

560
                        if completionPercent >= float32(100.0) {
10✔
561
                                klog.V(2).Infof("snapshot(%s) under rg(%s) complete", snapshotName, resourceGroup)
×
562
                                return nil
×
563
                        }
×
564
                        klog.V(2).Infof("snapshot(%s) under rg(%s) completionPercent: %f", snapshotName, resourceGroup, completionPercent)
10✔
565
                case <-timeAfter:
1✔
566
                        return fmt.Errorf("timeout waiting for snapshot(%s) under rg(%s)", snapshotName, resourceGroup)
1✔
567
                }
568
        }
569
}
570

571
// getUsedLunsFromVolumeAttachments returns a list of used luns from VolumeAttachments
572
func (d *DriverCore) getUsedLunsFromVolumeAttachments(ctx context.Context, nodeName string) ([]int, error) {
1✔
573
        kubeClient := d.cloud.KubeClient
1✔
574
        if kubeClient == nil || kubeClient.StorageV1() == nil || kubeClient.StorageV1().VolumeAttachments() == nil {
2✔
575
                return nil, fmt.Errorf("kubeClient or kubeClient.StorageV1() or kubeClient.StorageV1().VolumeAttachments() is nil")
1✔
576
        }
1✔
577

578
        volumeAttachments, err := kubeClient.StorageV1().VolumeAttachments().List(ctx, metav1.ListOptions{
×
579
                TimeoutSeconds: ptr.To(int64(2))})
×
580
        if err != nil {
×
581
                return nil, err
×
582
        }
×
583

584
        usedLuns := make([]int, 0)
×
585
        if volumeAttachments == nil {
×
586
                klog.V(2).Infof("volumeAttachments is nil")
×
587
                return usedLuns, nil
×
588
        }
×
589

590
        klog.V(2).Infof("volumeAttachments count: %d, nodeName: %s", len(volumeAttachments.Items), nodeName)
×
591
        for _, va := range volumeAttachments.Items {
×
592
                klog.V(6).Infof("attacher: %s, nodeName: %s, Status: %v, PV: %s, attachmentMetadata: %v", va.Spec.Attacher, va.Spec.NodeName,
×
593
                        va.Status.Attached, ptr.Deref(va.Spec.Source.PersistentVolumeName, ""), va.Status.AttachmentMetadata)
×
594
                if va.Spec.Attacher == d.Name && strings.EqualFold(va.Spec.NodeName, nodeName) && va.Status.Attached {
×
595
                        if k, ok := va.Status.AttachmentMetadata[consts.LUN]; ok {
×
596
                                lun, err := strconv.Atoi(k)
×
597
                                if err != nil {
×
598
                                        klog.Warningf("VolumeAttachment(%s) lun(%s) is not a valid integer", va.Name, k)
×
599
                                        continue
×
600
                                }
601
                                usedLuns = append(usedLuns, lun)
×
602
                        }
603
                }
604
        }
605
        return usedLuns, nil
×
606
}
607

608
// getUsedLunsFromNode returns a list of sorted used luns from Node
609
func (d *DriverCore) getUsedLunsFromNode(nodeName types.NodeName) ([]int, error) {
1✔
610
        disks, _, err := d.diskController.GetNodeDataDisks(nodeName, azcache.CacheReadTypeDefault)
1✔
611
        if err != nil {
1✔
612
                klog.Errorf("error of getting data disks for node %s: %v", nodeName, err)
×
613
                return nil, err
×
614
        }
×
615

616
        usedLuns := make([]int, 0)
1✔
617
        // get all disks attached to the node
1✔
618
        for _, disk := range disks {
3✔
619
                if disk.Lun == nil {
2✔
620
                        klog.Warningf("disk(%s) lun is nil", *disk.Name)
×
621
                        continue
×
622
                }
623
                usedLuns = append(usedLuns, int(*disk.Lun))
2✔
624
        }
625
        return usedLuns, nil
1✔
626
}
627

628
// getNodeInfoFromLabels get zone, instanceType from node labels
629
func getNodeInfoFromLabels(ctx context.Context, nodeName string, kubeClient clientset.Interface) (string, string, error) {
2✔
630
        if kubeClient == nil || kubeClient.CoreV1() == nil {
4✔
631
                return "", "", fmt.Errorf("kubeClient is nil")
2✔
632
        }
2✔
633

634
        node, err := kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
×
635
        if err != nil {
×
636
                return "", "", fmt.Errorf("get node(%s) failed with %v", nodeName, err)
×
637
        }
×
638

639
        if len(node.Labels) == 0 {
×
640
                return "", "", fmt.Errorf("node(%s) label is empty", nodeName)
×
641
        }
×
642
        return node.Labels[consts.WellKnownTopologyKey], node.Labels[consts.InstanceTypeKey], nil
×
643
}
644

645
// getDefaultDiskIOPSReadWrite according to requestGiB
646
//
647
//        ref: https://docs.microsoft.com/en-us/azure/virtual-machines/disks-types#ultra-disk-iops
648
func getDefaultDiskIOPSReadWrite(requestGiB int) int {
9✔
649
        iops := azurecloudconsts.DefaultDiskIOPSReadWrite
9✔
650
        if requestGiB > iops {
16✔
651
                iops = requestGiB
7✔
652
        }
7✔
653
        if iops > 160000 {
12✔
654
                iops = 160000
3✔
655
        }
3✔
656
        return iops
9✔
657
}
658

659
// getDefaultDiskMBPSReadWrite according to requestGiB
660
//
661
//        ref: https://docs.microsoft.com/en-us/azure/virtual-machines/disks-types#ultra-disk-throughput
662
func getDefaultDiskMBPSReadWrite(requestGiB int) int {
6✔
663
        bandwidth := azurecloudconsts.DefaultDiskMBpsReadWrite
6✔
664
        iops := getDefaultDiskIOPSReadWrite(requestGiB)
6✔
665
        if iops/256 > bandwidth {
10✔
666
                bandwidth = int(volumehelper.RoundUpSize(int64(iops), 256))
4✔
667
        }
4✔
668
        if bandwidth > iops/4 {
6✔
669
                bandwidth = int(volumehelper.RoundUpSize(int64(iops), 4))
×
670
        }
×
671
        if bandwidth > 4000 {
6✔
672
                bandwidth = 4000
×
673
        }
×
674
        return bandwidth
6✔
675
}
676

677
// getVMSSInstanceName get instance name from vmss compute name, e.g. "aks-agentpool-20657377-vmss_2" -> "aks-agentpool-20657377-vmss000002"
678
func getVMSSInstanceName(computeName string) (string, error) {
5✔
679
        names := strings.Split(computeName, "_")
5✔
680
        if len(names) != 2 {
6✔
681
                return "", fmt.Errorf("invalid vmss compute name: %s", computeName)
1✔
682
        }
1✔
683

684
        instanceID, err := strconv.Atoi(names[1])
4✔
685
        if err != nil {
5✔
686
                return "", fmt.Errorf("parsing vmss compute name(%s) failed with %v", computeName, err)
1✔
687
        }
1✔
688
        return fmt.Sprintf("%s%06s", names[0], strconv.FormatInt(int64(instanceID), 36)), nil
3✔
689
}
690

691
// Struct for JSON patch operations
692
type JSONPatch struct {
693
        OP    string      `json:"op,omitempty"`
694
        Path  string      `json:"path,omitempty"`
695
        Value interface{} `json:"value"`
696
}
697

698
// removeTaintInBackground is a goroutine that retries removeNotReadyTaint with exponential backoff
699
func removeTaintInBackground(k8sClient kubernetes.Interface, nodeName, driverName string, backoff wait.Backoff, removalFunc func(kubernetes.Interface, string, string) error) {
×
700
        backoffErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
×
701
                err := removalFunc(k8sClient, nodeName, driverName)
×
702
                if err != nil {
×
703
                        klog.ErrorS(err, "Unexpected failure when attempting to remove node taint(s)")
×
704
                        return false, nil
×
705
                }
×
706
                return true, nil
×
707
        })
708

709
        if backoffErr != nil {
×
710
                klog.ErrorS(backoffErr, "Retries exhausted, giving up attempting to remove node taint(s)")
×
711
        }
×
712
}
713

714
// removeNotReadyTaint removes the taint disk.csi.azure.com/agent-not-ready from the local node
715
// This taint can be optionally applied by users to prevent startup race conditions such as
716
// https://github.com/kubernetes/kubernetes/issues/95911
717
func removeNotReadyTaint(clientset kubernetes.Interface, nodeName, driverName string) error {
×
718
        ctx := context.Background()
×
719
        node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
×
720
        if err != nil {
×
721
                return err
×
722
        }
×
723

724
        if err := checkAllocatable(ctx, clientset, nodeName, driverName); err != nil {
×
725
                return err
×
726
        }
×
727

728
        taintKeyToRemove := driverName + consts.AgentNotReadyNodeTaintKeySuffix
×
729
        klog.V(2).Infof("removing taint with key %s from local node %s", taintKeyToRemove, nodeName)
×
730
        var taintsToKeep []corev1.Taint
×
731
        for _, taint := range node.Spec.Taints {
×
732
                klog.V(5).Infof("checking taint key %s, value %s, effect %s", taint.Key, taint.Value, taint.Effect)
×
733
                if taint.Key != taintKeyToRemove {
×
734
                        taintsToKeep = append(taintsToKeep, taint)
×
735
                } else {
×
736
                        klog.V(2).Infof("queued taint for removal with key %s, effect %s", taint.Key, taint.Effect)
×
737
                }
×
738
        }
739

740
        if len(taintsToKeep) == len(node.Spec.Taints) {
×
741
                klog.V(2).Infof("No taints to remove on node, skipping taint removal")
×
742
                return nil
×
743
        }
×
744

745
        patchRemoveTaints := []JSONPatch{
×
746
                {
×
747
                        OP:    "test",
×
748
                        Path:  "/spec/taints",
×
749
                        Value: node.Spec.Taints,
×
750
                },
×
751
                {
×
752
                        OP:    "replace",
×
753
                        Path:  "/spec/taints",
×
754
                        Value: taintsToKeep,
×
755
                },
×
756
        }
×
757

×
758
        patch, err := json.Marshal(patchRemoveTaints)
×
759
        if err != nil {
×
760
                return err
×
761
        }
×
762

763
        _, err = clientset.CoreV1().Nodes().Patch(ctx, nodeName, k8stypes.JSONPatchType, patch, metav1.PatchOptions{})
×
764
        if err != nil {
×
765
                return err
×
766
        }
×
767
        klog.V(2).Infof("removed taint with key %s from local node %s successfully", taintKeyToRemove, nodeName)
×
768
        return nil
×
769
}
770

771
func checkAllocatable(ctx context.Context, clientset kubernetes.Interface, nodeName, driverName string) error {
×
772
        csiNode, err := clientset.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
×
773
        if err != nil {
×
774
                return fmt.Errorf("isAllocatableSet: failed to get CSINode for %s: %w", nodeName, err)
×
775
        }
×
776

777
        for _, driver := range csiNode.Spec.Drivers {
×
778
                if driver.Name == driverName {
×
779
                        if driver.Allocatable != nil && driver.Allocatable.Count != nil {
×
780
                                klog.V(2).Infof("CSINode Allocatable value is set for driver on node %s, count %d", nodeName, *driver.Allocatable.Count)
×
781
                                return nil
×
782
                        }
×
783
                        return fmt.Errorf("isAllocatableSet: allocatable value not set for driver on node %s", nodeName)
×
784
                }
785
        }
786

787
        return fmt.Errorf("isAllocatableSet: driver not found on node %s", nodeName)
×
788
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc