• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 14642486131

24 Apr 2025 01:10PM UTC coverage: 78.052% (-0.03%) from 78.078%
14642486131

Pull #1964

github

andyzhangx
fix: incorrect metris log in NodeStageVolume
Pull Request #1964: fix: incorrect metris log in NodeStageVolume

0 of 1 new or added line in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

2340 of 2998 relevant lines covered (78.05%)

7.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.01
/pkg/blob/nodeserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "fmt"
21
        "io/fs"
22
        "os"
23
        "os/exec"
24
        "path/filepath"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        volumehelper "sigs.k8s.io/blob-csi-driver/pkg/util"
30
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
31
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
32

33
        "github.com/container-storage-interface/spec/lib/go/csi"
34

35
        "k8s.io/klog/v2"
36
        "k8s.io/kubernetes/pkg/volume"
37
        "k8s.io/kubernetes/pkg/volume/util"
38
        mount "k8s.io/mount-utils"
39

40
        "google.golang.org/grpc/codes"
41
        "google.golang.org/grpc/status"
42

43
        "golang.org/x/net/context"
44
        "google.golang.org/grpc"
45
        mount_azure_blob "sigs.k8s.io/blob-csi-driver/pkg/blobfuse-proxy/pb"
46
)
47

48
const (
49
        waitForMountInterval = 20 * time.Millisecond
50
        waitForMountTimeout  = 60 * time.Second
51
)
52

53
type MountClient struct {
54
        service mount_azure_blob.MountServiceClient
55
}
56

57
// NewMountClient returns a new mount client
58
func NewMountClient(cc *grpc.ClientConn) *MountClient {
1✔
59
        service := mount_azure_blob.NewMountServiceClient(cc)
1✔
60
        return &MountClient{service}
1✔
61
}
1✔
62

63
// NodePublishVolume mount the volume from staging to target path
64
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
8✔
65
        volCap := req.GetVolumeCapability()
8✔
66
        if volCap == nil {
9✔
67
                return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
1✔
68
        }
1✔
69
        volumeID := req.GetVolumeId()
7✔
70
        if len(req.GetVolumeId()) == 0 {
8✔
71
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
72
        }
1✔
73

74
        target := req.GetTargetPath()
6✔
75
        if len(target) == 0 {
7✔
76
                return nil, status.Error(codes.InvalidArgument, "Target path not provided")
1✔
77
        }
1✔
78

79
        mountPermissions := d.mountPermissions
5✔
80
        context := req.GetVolumeContext()
5✔
81
        if context != nil {
7✔
82
                // token request
2✔
83
                if context[serviceAccountTokenField] != "" && getValueInMap(context, clientIDField) != "" {
2✔
84
                        klog.V(2).Infof("NodePublishVolume: volume(%s) mount on %s with service account token, clientID: %s", volumeID, target, getValueInMap(context, clientIDField))
×
85
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
86
                                StagingTargetPath: target,
×
87
                                VolumeContext:     context,
×
88
                                VolumeCapability:  volCap,
×
89
                                VolumeId:          volumeID,
×
90
                        })
×
91
                        return &csi.NodePublishVolumeResponse{}, err
×
92
                }
×
93

94
                // ephemeral volume
95
                if strings.EqualFold(context[ephemeralField], trueValue) {
2✔
96
                        setKeyValueInMap(context, secretNamespaceField, context[podNamespaceField])
×
97
                        if !d.allowInlineVolumeKeyAccessWithIdentity {
×
98
                                // only get storage account from secret
×
99
                                setKeyValueInMap(context, getAccountKeyFromSecretField, trueValue)
×
100
                                setKeyValueInMap(context, storageAccountField, "")
×
101
                        }
×
102
                        klog.V(2).Infof("NodePublishVolume: ephemeral volume(%s) mount on %s", volumeID, target)
×
103
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
104
                                StagingTargetPath: target,
×
105
                                VolumeContext:     context,
×
106
                                VolumeCapability:  volCap,
×
107
                                VolumeId:          volumeID,
×
108
                        })
×
109
                        return &csi.NodePublishVolumeResponse{}, err
×
110
                }
111

112
                if perm := getValueInMap(context, mountPermissionsField); perm != "" {
4✔
113
                        var err error
2✔
114
                        if mountPermissions, err = strconv.ParseUint(perm, 8, 32); err != nil {
3✔
115
                                return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s", perm)
1✔
116
                        }
1✔
117
                }
118
        }
119

120
        source := req.GetStagingTargetPath()
4✔
121
        if len(source) == 0 {
5✔
122
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
123
        }
1✔
124

125
        mountOptions := []string{"bind"}
3✔
126
        if req.GetReadonly() {
6✔
127
                mountOptions = append(mountOptions, "ro")
3✔
128
        }
3✔
129

130
        mnt, err := d.ensureMountPoint(target, fs.FileMode(mountPermissions))
3✔
131
        if err != nil {
4✔
132
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", target, err)
1✔
133
        }
1✔
134
        if mnt {
2✔
135
                klog.V(2).Infof("NodePublishVolume: volume %s is already mounted on %s", volumeID, target)
×
136
                return &csi.NodePublishVolumeResponse{}, nil
×
137
        }
×
138

139
        klog.V(2).Infof("NodePublishVolume: volume %s mounting %s at %s with mountOptions: %v", volumeID, source, target, mountOptions)
2✔
140
        if d.enableBlobMockMount {
2✔
141
                klog.Warningf("NodePublishVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
×
142
                if err := volumehelper.MakeDir(target, os.FileMode(mountPermissions)); err != nil {
×
143
                        klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
×
144
                        return nil, status.Errorf(codes.Internal, "%v", err)
×
145
                }
×
146
                return &csi.NodePublishVolumeResponse{}, nil
×
147
        }
148

149
        if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
2✔
150
                if removeErr := os.Remove(target); removeErr != nil {
×
151
                        return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
×
152
                }
×
153
                return nil, status.Errorf(codes.Internal, "Could not mount %q at %q: %v", source, target, err)
×
154
        }
155
        klog.V(2).Infof("NodePublishVolume: volume %s mount %s at %s successfully", volumeID, source, target)
2✔
156

2✔
157
        return &csi.NodePublishVolumeResponse{}, nil
2✔
158
}
159

160
func (d *Driver) mountBlobfuseWithProxy(args, protocol string, authEnv []string) (string, error) {
1✔
161
        var resp *mount_azure_blob.MountAzureBlobResponse
1✔
162
        var output string
1✔
163
        connectionTimout := time.Duration(d.blobfuseProxyConnTimout) * time.Second
1✔
164
        ctx, cancel := context.WithTimeout(context.Background(), connectionTimout)
1✔
165
        defer cancel()
1✔
166
        klog.V(2).Infof("start connecting to blobfuse proxy, protocol: %s, args: %s", protocol, args)
1✔
167
        conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
1✔
168
        if err != nil {
2✔
169
                klog.Errorf("failed to connect to blobfuse proxy: %v", err)
1✔
170
                return "", err
1✔
171
        }
1✔
172
        defer func() {
×
173
                if err := conn.Close(); err != nil {
×
174
                        klog.Errorf("failed to close connection to blobfuse proxy: %v", err)
×
175
                }
×
176
        }()
177

178
        mountClient := NewMountClient(conn)
×
179
        mountreq := mount_azure_blob.MountAzureBlobRequest{
×
180
                MountArgs: args,
×
181
                Protocol:  protocol,
×
182
                AuthEnv:   authEnv,
×
183
        }
×
184
        klog.V(2).Infof("begin to mount with blobfuse proxy, protocol: %s, args: %s", protocol, args)
×
185
        resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
×
186
        if err != nil {
×
187
                klog.Error("GRPC call returned with an error:", err)
×
188
        }
×
189
        output = resp.GetOutput()
×
190

×
191
        return output, err
×
192
}
193

194
func (d *Driver) mountBlobfuseInsideDriver(args string, protocol string, authEnv []string) (string, error) {
1✔
195
        var cmd *exec.Cmd
1✔
196

1✔
197
        args = volumehelper.TrimDuplicatedSpace(args)
1✔
198

1✔
199
        mountLog := "mount inside driver with"
1✔
200
        if protocol == Fuse2 {
1✔
201
                mountLog += " v2"
×
202
                args = "mount " + args
×
203
                cmd = exec.Command("blobfuse2", strings.Split(args, " ")...)
×
204
        } else {
1✔
205
                mountLog += " v1"
1✔
206
                cmd = exec.Command("blobfuse", strings.Split(args, " ")...)
1✔
207
        }
1✔
208
        klog.V(2).Infof("%s, protocol: %s, args: %s", mountLog, protocol, args)
1✔
209

1✔
210
        cmd.Env = append(os.Environ(), authEnv...)
1✔
211
        output, err := cmd.CombinedOutput()
1✔
212
        klog.V(2).Infof("mount output: %s\n", string(output))
1✔
213

1✔
214
        return string(output), err
1✔
215
}
216

217
// NodeUnpublishVolume unmount the volume from the target path
218
func (d *Driver) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
3✔
219
        volumeID := req.GetVolumeId()
3✔
220
        if len(volumeID) == 0 {
4✔
221
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
222
        }
1✔
223
        targetPath := req.GetTargetPath()
2✔
224
        if len(targetPath) == 0 {
3✔
225
                return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
1✔
226
        }
1✔
227

228
        klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
1✔
229
        err := mount.CleanupMountPoint(targetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
230
        if err != nil {
1✔
231
                return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", targetPath, err)
×
232
        }
×
233
        klog.V(2).Infof("NodeUnpublishVolume: unmount volume %s on %s successfully", volumeID, targetPath)
1✔
234

1✔
235
        return &csi.NodeUnpublishVolumeResponse{}, nil
1✔
236
}
237

238
// NodeStageVolume mount the volume to a staging path
239
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
10✔
240
        volumeID := req.GetVolumeId()
10✔
241
        if len(volumeID) == 0 {
11✔
242
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
243
        }
1✔
244
        targetPath := req.GetStagingTargetPath()
9✔
245
        if len(targetPath) == 0 {
10✔
246
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
247
        }
1✔
248
        volumeCapability := req.GetVolumeCapability()
8✔
249
        if volumeCapability == nil {
9✔
250
                return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
1✔
251
        }
1✔
252

253
        lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
7✔
254
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
9✔
255
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
2✔
256
        }
2✔
257
        defer d.volumeLocks.Release(lockKey)
5✔
258

5✔
259
        mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
5✔
260
        volumeMountGroup := req.GetVolumeCapability().GetMount().GetVolumeMountGroup()
5✔
261
        attrib := req.GetVolumeContext()
5✔
262
        secrets := req.GetSecrets()
5✔
263

5✔
264
        if getValueInMap(attrib, clientIDField) != "" && attrib[serviceAccountTokenField] == "" {
5✔
265
                klog.V(2).Infof("Skip NodeStageVolume for volume(%s) since clientID %s is provided but service account token is empty", volumeID, getValueInMap(attrib, clientIDField))
×
266
                return &csi.NodeStageVolumeResponse{}, nil
×
267
        }
×
268

269
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_stage_volume", d.cloud.ResourceGroup, "", d.Name)
5✔
270
        isOperationSucceeded := false
5✔
271
        defer func() {
10✔
272
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
5✔
273
        }()
5✔
274

275
        var serverAddress, storageEndpointSuffix, protocol, ephemeralVolMountOptions string
5✔
276
        var ephemeralVol, isHnsEnabled bool
5✔
277

5✔
278
        containerNameReplaceMap := map[string]string{}
5✔
279

5✔
280
        fsGroupChangePolicy := d.fsGroupChangePolicy
5✔
281

5✔
282
        mountPermissions := d.mountPermissions
5✔
283
        performChmodOp := (mountPermissions > 0)
5✔
284
        for k, v := range attrib {
12✔
285
                switch strings.ToLower(k) {
7✔
286
                case serverNameField:
×
287
                        serverAddress = v
×
288
                case protocolField:
2✔
289
                        protocol = v
2✔
290
                case storageEndpointSuffixField:
×
291
                        storageEndpointSuffix = v
×
292
                case ephemeralField:
×
293
                        ephemeralVol = strings.EqualFold(v, trueValue)
×
294
                case mountOptionsField:
×
295
                        ephemeralVolMountOptions = v
×
296
                case isHnsEnabledField:
×
297
                        isHnsEnabled = strings.EqualFold(v, trueValue)
×
298
                case pvcNamespaceKey:
×
299
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
300
                case pvcNameKey:
×
301
                        containerNameReplaceMap[pvcNameMetadata] = v
×
302
                case pvNameKey:
×
303
                        containerNameReplaceMap[pvNameMetadata] = v
×
304
                case mountPermissionsField:
4✔
305
                        if v != "" {
8✔
306
                                var err error
4✔
307
                                var perm uint64
4✔
308
                                if perm, err = strconv.ParseUint(v, 8, 32); err != nil {
5✔
309
                                        return nil, status.Errorf(codes.InvalidArgument, "invalid mountPermissions %s", v)
1✔
310
                                }
1✔
311
                                if perm == 0 {
3✔
312
                                        performChmodOp = false
×
313
                                } else {
3✔
314
                                        mountPermissions = perm
3✔
315
                                }
3✔
316
                        }
317
                case fsGroupChangePolicyField:
1✔
318
                        fsGroupChangePolicy = v
1✔
319
                }
320
        }
321

322
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
5✔
323
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
324
        }
1✔
325

326
        mnt, err := d.ensureMountPoint(targetPath, fs.FileMode(mountPermissions))
3✔
327
        if err != nil {
4✔
328
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", targetPath, err)
1✔
329
        }
1✔
330
        if mnt {
2✔
331
                klog.V(2).Infof("NodeStageVolume: volume %s is already mounted on %s", volumeID, targetPath)
×
332
                return &csi.NodeStageVolumeResponse{}, nil
×
333
        }
×
334

335
        _, accountName, _, containerName, authEnv, err := d.GetAuthEnv(ctx, volumeID, protocol, attrib, secrets)
2✔
336
        if err != nil {
2✔
337
                return nil, status.Errorf(codes.Internal, "%v", err)
×
338
        }
×
339

340
        // replace pv/pvc name namespace metadata in subDir
341
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
2✔
342

2✔
343
        if strings.TrimSpace(storageEndpointSuffix) == "" {
4✔
344
                storageEndpointSuffix = d.getStorageEndPointSuffix()
2✔
345
        }
2✔
346

347
        if strings.TrimSpace(serverAddress) == "" {
4✔
348
                // server address is "accountname.blob.core.windows.net" by default
2✔
349
                serverAddress = fmt.Sprintf("%s.blob.%s", accountName, storageEndpointSuffix)
2✔
350
        }
2✔
351

352
        if isReadOnlyFromCapability(volumeCapability) {
2✔
353
                if isNFSProtocol(protocol) {
×
354
                        mountFlags = util.JoinMountOptions(mountFlags, []string{"ro"})
×
355
                } else {
×
356
                        mountFlags = util.JoinMountOptions(mountFlags, []string{"-o ro"})
×
357
                }
×
358
                klog.V(2).Infof("CSI volume is read-only, mounting with extra option ro")
×
359
        }
360

361
        if isNFSProtocol(protocol) {
3✔
362
                klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\nmountflags %v\nserverAddress %v",
1✔
363
                        targetPath, protocol, volumeID, mountFlags, serverAddress)
1✔
364

1✔
365
                mountType := AZNFS
1✔
366
                if !d.enableAznfsMount || protocol == NFSv3 {
2✔
367
                        mountType = NFS
1✔
368
                }
1✔
369

370
                if storageEndpointSuffix != "" && mountType == AZNFS {
1✔
371
                        aznfsEndpoint := strings.Replace(storageEndpointSuffix, "core.", "", 1)
×
372
                        klog.V(2).Infof("set AZURE_ENDPOINT_OVERRIDE to %s", aznfsEndpoint)
×
373
                        os.Setenv("AZURE_ENDPOINT_OVERRIDE", aznfsEndpoint)
×
374
                }
×
375

376
                source := fmt.Sprintf("%s:/%s/%s", serverAddress, accountName, containerName)
1✔
377
                mountOptions := util.JoinMountOptions(mountFlags, []string{"sec=sys,vers=3,nolock"})
1✔
378
                execFunc := func() error { return d.mounter.MountSensitive(source, targetPath, mountType, mountOptions, []string{}) }
2✔
379
                timeoutFunc := func() error { return fmt.Errorf("time out") }
1✔
380
                if err := volumehelper.WaitUntilTimeout(90*time.Second, execFunc, timeoutFunc); err != nil {
1✔
381
                        var helpLinkMsg string
×
382
                        if d.appendMountErrorHelpLink {
×
383
                                helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
384
                        }
×
385
                        return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with %v%s", volumeID, source, targetPath, err, helpLinkMsg))
×
386
                }
387

388
                if performChmodOp {
1✔
389
                        if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {
×
390
                                return nil, status.Error(codes.Internal, err.Error())
×
391
                        }
×
392
                } else {
1✔
393
                        klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
1✔
394
                }
1✔
395

396
                if volumeMountGroup != "" && fsGroupChangePolicy != FSGroupChangeNone {
1✔
397
                        klog.V(2).Infof("set gid of volume(%s) as %s using fsGroupChangePolicy(%s)", volumeID, volumeMountGroup, fsGroupChangePolicy)
×
398
                        if err := volumehelper.SetVolumeOwnership(targetPath, volumeMountGroup, fsGroupChangePolicy); err != nil {
×
399
                                return nil, status.Error(codes.Internal, fmt.Sprintf("SetVolumeOwnership with volume(%s) on %s failed with %v", volumeID, targetPath, err))
×
400
                        }
×
401
                }
402

403
                isOperationSucceeded = true
1✔
404
                klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
1✔
405
                return &csi.NodeStageVolumeResponse{}, nil
1✔
406
        }
407

408
        // Get mountOptions that the volume will be formatted and mounted with
409
        mountOptions := mountFlags
1✔
410
        if ephemeralVol {
1✔
411
                mountOptions = util.JoinMountOptions(mountOptions, strings.Split(ephemeralVolMountOptions, ","))
×
412
        }
×
413
        if isHnsEnabled {
1✔
414
                mountOptions = util.JoinMountOptions(mountOptions, []string{"--use-adls=true"})
×
415
        }
×
416

417
        if !checkGidPresentInMountFlags(mountFlags) && volumeMountGroup != "" {
1✔
418
                klog.V(2).Infof("append volumeMountGroup %s", volumeMountGroup)
×
419
                mountOptions = append(mountOptions, fmt.Sprintf("-o gid=%s", volumeMountGroup))
×
420
        }
×
421

422
        tmpPath := fmt.Sprintf("%s/%s", "/mnt", volumeID)
1✔
423
        if d.appendTimeStampInCacheDir {
1✔
424
                tmpPath += fmt.Sprintf("#%d", time.Now().Unix())
×
425
        }
×
426
        mountOptions = appendDefaultMountOptions(mountOptions, tmpPath, containerName)
1✔
427

1✔
428
        args := targetPath
1✔
429
        for _, opt := range mountOptions {
7✔
430
                args = args + " " + opt
6✔
431
        }
6✔
432

433
        klog.V(2).Infof("target %v protocol %v volumeId %v\nmountflags %v\nmountOptions %v volumeMountGroup %s\nargs %v\nserverAddress %v",
1✔
434
                targetPath, protocol, volumeID, mountFlags, mountOptions, volumeMountGroup, args, serverAddress)
1✔
435

1✔
436
        authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
1✔
437
        if d.enableBlobMockMount {
2✔
438
                klog.Warningf("NodeStageVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
1✔
439
                if err := volumehelper.MakeDir(targetPath, os.FileMode(mountPermissions)); err != nil {
1✔
440
                        klog.Errorf("MakeDir failed on target: %s (%v)", targetPath, err)
×
441
                        return nil, status.Errorf(codes.Internal, "%v", err)
×
442
                }
×
443
                return &csi.NodeStageVolumeResponse{}, nil
1✔
444
        }
445

446
        var output string
×
447
        if d.enableBlobfuseProxy {
×
448
                output, err = d.mountBlobfuseWithProxy(args, protocol, authEnv)
×
449
        } else {
×
450
                output, err = d.mountBlobfuseInsideDriver(args, protocol, authEnv)
×
451
        }
×
452

453
        if err != nil {
×
454
                var helpLinkMsg string
×
455
                if d.appendMountErrorHelpLink {
×
456
                        helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
457
                }
×
458
                err = status.Errorf(codes.Internal, "Mount failed with error: %v, output: %v%s", err, output, helpLinkMsg)
×
459
                klog.Errorf("%v", err)
×
460
                notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
461
                if mntErr != nil {
×
462
                        klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
463
                        return nil, err
×
464
                }
×
465
                if !notMnt {
×
466
                        if mntErr = d.mounter.Unmount(targetPath); mntErr != nil {
×
467
                                klog.Errorf("Failed to unmount: %v", mntErr)
×
468
                                return nil, err
×
469
                        }
×
470
                        notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
471
                        if mntErr != nil {
×
472
                                klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
473
                                return nil, err
×
474
                        }
×
475
                        if !notMnt {
×
476
                                // This is very odd, we don't expect it.  We'll try again next sync loop.
×
477
                                klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", targetPath)
×
478
                                return nil, err
×
479
                        }
×
480
                }
481
                os.Remove(targetPath)
×
482
                return nil, err
×
483
        }
484

485
        // wait a few seconds to make sure blobfuse mount is successful
486
        // please refer to https://github.com/Azure/azure-storage-fuse/pull/1088 for more details
487
        if err := waitForMount(targetPath, waitForMountInterval, waitForMountTimeout); err != nil {
×
488
                return nil, fmt.Errorf("failed to wait for mount: %w", err)
×
489
        }
×
490

491
        klog.V(2).Infof("volume(%s) mount on %q succeeded", volumeID, targetPath)
×
NEW
492
        isOperationSucceeded = true
×
UNCOV
493
        return &csi.NodeStageVolumeResponse{}, nil
×
494
}
495

496
// NodeUnstageVolume unmount the volume from the staging path
497
func (d *Driver) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
3✔
498
        volumeID := req.GetVolumeId()
3✔
499
        if len(volumeID) == 0 {
4✔
500
                return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
1✔
501
        }
1✔
502

503
        stagingTargetPath := req.GetStagingTargetPath()
2✔
504
        if len(stagingTargetPath) == 0 {
3✔
505
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
506
        }
1✔
507

508
        lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
1✔
509
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
1✔
510
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
511
        }
×
512
        defer d.volumeLocks.Release(lockKey)
1✔
513

1✔
514
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_unstage_volume", d.cloud.ResourceGroup, "", d.Name)
1✔
515
        isOperationSucceeded := false
1✔
516
        defer func() {
2✔
517
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
1✔
518
        }()
1✔
519

520
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath)
1✔
521
        err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
522
        if err != nil {
1✔
523
                return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
×
524
        }
×
525
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmount on %s successfully", volumeID, stagingTargetPath)
1✔
526

1✔
527
        isOperationSucceeded = true
1✔
528
        return &csi.NodeUnstageVolumeResponse{}, nil
1✔
529
}
530

531
// NodeGetCapabilities return the capabilities of the Node plugin
532
func (d *Driver) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
1✔
533
        return &csi.NodeGetCapabilitiesResponse{
1✔
534
                Capabilities: d.NSCap,
1✔
535
        }, nil
1✔
536
}
1✔
537

538
// NodeGetInfo return info of the node on which this plugin is running
539
func (d *Driver) NodeGetInfo(_ context.Context, _ *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
1✔
540
        return &csi.NodeGetInfoResponse{
1✔
541
                NodeId: d.NodeID,
1✔
542
        }, nil
1✔
543
}
1✔
544

545
// NodeExpandVolume node expand volume
546
func (d *Driver) NodeExpandVolume(_ context.Context, _ *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
1✔
547
        return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not yet implemented")
1✔
548
}
1✔
549

550
// NodeGetVolumeStats get volume stats
551
func (d *Driver) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
4✔
552
        if len(req.VolumeId) == 0 {
5✔
553
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
1✔
554
        }
1✔
555
        if len(req.VolumePath) == 0 {
4✔
556
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
1✔
557
        }
1✔
558

559
        // check if the volume stats is cached
560
        cache, err := d.volStatsCache.Get(ctx, req.VolumeId, azcache.CacheReadTypeDefault)
2✔
561
        if err != nil {
2✔
562
                return nil, status.Errorf(codes.Internal, "%v", err)
×
563
        }
×
564
        if cache != nil {
2✔
565
                resp := cache.(*csi.NodeGetVolumeStatsResponse)
×
566
                klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is cached", req.VolumeId, req.VolumePath)
×
567
                return resp, nil
×
568
        }
×
569

570
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_get_volume_stats", d.cloud.ResourceGroup, "", d.Name)
2✔
571
        mc.LogLevel = 6 // change log level
2✔
572
        isOperationSucceeded := false
2✔
573
        defer func() {
4✔
574
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, req.VolumeId)
2✔
575
        }()
2✔
576

577
        if _, err := os.Lstat(req.VolumePath); err != nil {
3✔
578
                if os.IsNotExist(err) {
2✔
579
                        return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
1✔
580
                }
1✔
581
                return nil, status.Errorf(codes.Internal, "failed to stat file %s: %v", req.VolumePath, err)
×
582
        }
583

584
        klog.V(6).Infof("NodeGetVolumeStats: begin to get VolumeStats on volume %s path %s", req.VolumeId, req.VolumePath)
1✔
585

1✔
586
        volumeMetrics, err := volume.NewMetricsStatFS(req.VolumePath).GetMetrics()
1✔
587
        if err != nil {
1✔
588
                return nil, status.Errorf(codes.Internal, "failed to get metrics: %v", err)
×
589
        }
×
590

591
        available, ok := volumeMetrics.Available.AsInt64()
1✔
592
        if !ok {
1✔
593
                return nil, status.Errorf(codes.Internal, "failed to transform volume available size(%v)", volumeMetrics.Available)
×
594
        }
×
595
        capacity, ok := volumeMetrics.Capacity.AsInt64()
1✔
596
        if !ok {
1✔
597
                return nil, status.Errorf(codes.Internal, "failed to transform volume capacity size(%v)", volumeMetrics.Capacity)
×
598
        }
×
599
        used, ok := volumeMetrics.Used.AsInt64()
1✔
600
        if !ok {
1✔
601
                return nil, status.Errorf(codes.Internal, "failed to transform volume used size(%v)", volumeMetrics.Used)
×
602
        }
×
603

604
        inodesFree, ok := volumeMetrics.InodesFree.AsInt64()
1✔
605
        if !ok {
1✔
606
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes free(%v)", volumeMetrics.InodesFree)
×
607
        }
×
608
        inodes, ok := volumeMetrics.Inodes.AsInt64()
1✔
609
        if !ok {
1✔
610
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes(%v)", volumeMetrics.Inodes)
×
611
        }
×
612
        inodesUsed, ok := volumeMetrics.InodesUsed.AsInt64()
1✔
613
        if !ok {
1✔
614
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes used(%v)", volumeMetrics.InodesUsed)
×
615
        }
×
616

617
        resp := &csi.NodeGetVolumeStatsResponse{
1✔
618
                Usage: []*csi.VolumeUsage{
1✔
619
                        {
1✔
620
                                Unit:      csi.VolumeUsage_BYTES,
1✔
621
                                Available: available,
1✔
622
                                Total:     capacity,
1✔
623
                                Used:      used,
1✔
624
                        },
1✔
625
                        {
1✔
626
                                Unit:      csi.VolumeUsage_INODES,
1✔
627
                                Available: inodesFree,
1✔
628
                                Total:     inodes,
1✔
629
                                Used:      inodesUsed,
1✔
630
                        },
1✔
631
                },
1✔
632
        }
1✔
633

1✔
634
        isOperationSucceeded = true
1✔
635
        klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is %v", req.VolumeId, req.VolumePath, resp)
1✔
636
        // cache the volume stats per volume
1✔
637
        d.volStatsCache.Set(req.VolumeId, resp)
1✔
638
        return resp, nil
1✔
639
}
640

641
// ensureMountPoint: create mount point if not exists
642
// return <true, nil> if it's already a mounted point otherwise return <false, nil>
643
func (d *Driver) ensureMountPoint(target string, perm os.FileMode) (bool, error) {
11✔
644
        notMnt, err := d.mounter.IsLikelyNotMountPoint(target)
11✔
645
        if err != nil && !os.IsNotExist(err) {
13✔
646
                if IsCorruptedDir(target) {
2✔
647
                        notMnt = false
×
648
                        klog.Warningf("detected corrupted mount for targetPath [%s]", target)
×
649
                } else {
2✔
650
                        return !notMnt, err
2✔
651
                }
2✔
652
        }
653

654
        // Check all the mountpoints in case IsLikelyNotMountPoint
655
        // cannot handle --bind mount
656
        mountList, err := d.mounter.List()
9✔
657
        if err != nil {
9✔
658
                return !notMnt, err
×
659
        }
×
660

661
        targetAbs, err := filepath.Abs(target)
9✔
662
        if err != nil {
9✔
663
                return !notMnt, err
×
664
        }
×
665

666
        for _, mountPoint := range mountList {
9✔
667
                if mountPoint.Path == targetAbs {
×
668
                        notMnt = false
×
669
                        break
×
670
                }
671
        }
672

673
        if !notMnt {
11✔
674
                // testing original mount point, make sure the mount link is valid
2✔
675
                _, err := os.ReadDir(target)
2✔
676
                if err == nil {
3✔
677
                        klog.V(2).Infof("already mounted to target %s", target)
1✔
678
                        return !notMnt, nil
1✔
679
                }
1✔
680
                // mount link is invalid, now unmount and remount later
681
                klog.Warningf("ReadDir %s failed with %v, unmount this directory", target, err)
1✔
682
                if err := d.mounter.Unmount(target); err != nil {
1✔
683
                        klog.Errorf("Unmount directory %s failed with %v", target, err)
×
684
                        return !notMnt, err
×
685
                }
×
686
                notMnt = true
1✔
687
                return !notMnt, err
1✔
688
        }
689
        if err := volumehelper.MakeDir(target, perm); err != nil {
9✔
690
                klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
2✔
691
                return !notMnt, err
2✔
692
        }
2✔
693
        return !notMnt, nil
5✔
694
}
695

696
func waitForMount(path string, intervel, timeout time.Duration) error {
2✔
697
        timeAfter := time.After(timeout)
2✔
698
        timeTick := time.Tick(intervel)
2✔
699

2✔
700
        for {
10✔
701
                select {
8✔
702
                case <-timeTick:
7✔
703
                        notMount, err := mount.New("").IsLikelyNotMountPoint(path)
7✔
704
                        if err != nil {
8✔
705
                                return err
1✔
706
                        }
1✔
707
                        if !notMount {
6✔
708
                                klog.V(2).Infof("blobfuse mount at %s success", path)
×
709
                                return nil
×
710
                        }
×
711
                case <-timeAfter:
1✔
712
                        return fmt.Errorf("timeout waiting for mount %s", path)
1✔
713
                }
714
        }
715
}
716

717
func checkGidPresentInMountFlags(mountFlags []string) bool {
4✔
718
        for _, mountFlag := range mountFlags {
6✔
719
                if strings.Contains(mountFlag, "gid=") {
4✔
720
                        return true
2✔
721
                }
2✔
722
        }
723
        return false
2✔
724
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc