• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 7412225870

04 Jan 2024 04:35PM UTC coverage: 77.871%. Remained the same
7412225870

Pull #1209

github

web-flow
chore(deps): bump golang.org/x/sync from 0.5.0 to 0.6.0

Bumps [golang.org/x/sync](https://github.com/golang/sync) from 0.5.0 to 0.6.0.
- [Commits](https://github.com/golang/sync/compare/v0.5.0...v0.6.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sync
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1209: chore(deps): bump golang.org/x/sync from 0.5.0 to 0.6.0

2034 of 2612 relevant lines covered (77.87%)

7.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.05
/pkg/blob/nodeserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "fmt"
21
        "io/fs"
22
        "os"
23
        "os/exec"
24
        "path/filepath"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        volumehelper "sigs.k8s.io/blob-csi-driver/pkg/util"
30
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
31
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
32

33
        "github.com/Azure/azure-sdk-for-go/storage"
34
        "github.com/container-storage-interface/spec/lib/go/csi"
35

36
        "k8s.io/apimachinery/pkg/util/wait"
37
        "k8s.io/klog/v2"
38
        "k8s.io/kubernetes/pkg/volume"
39
        "k8s.io/kubernetes/pkg/volume/util"
40
        mount "k8s.io/mount-utils"
41

42
        "google.golang.org/grpc/codes"
43
        "google.golang.org/grpc/status"
44

45
        "golang.org/x/net/context"
46
        "google.golang.org/grpc"
47
        mount_azure_blob "sigs.k8s.io/blob-csi-driver/pkg/blobfuse-proxy/pb"
48
)
49

50
const (
51
        waitForMountInterval = 20 * time.Millisecond
52
        waitForMountTimeout  = 60 * time.Second
53
)
54

55
type MountClient struct {
56
        service mount_azure_blob.MountServiceClient
57
}
58

59
// NewMountClient returns a new mount client
60
func NewMountClient(cc *grpc.ClientConn) *MountClient {
1✔
61
        service := mount_azure_blob.NewMountServiceClient(cc)
1✔
62
        return &MountClient{service}
1✔
63
}
1✔
64

65
// NodePublishVolume mount the volume from staging to target path
66
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
8✔
67
        volCap := req.GetVolumeCapability()
8✔
68
        if volCap == nil {
9✔
69
                return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
1✔
70
        }
1✔
71
        volumeID := req.GetVolumeId()
7✔
72
        if len(req.GetVolumeId()) == 0 {
8✔
73
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
74
        }
1✔
75

76
        target := req.GetTargetPath()
6✔
77
        if len(target) == 0 {
7✔
78
                return nil, status.Error(codes.InvalidArgument, "Target path not provided")
1✔
79
        }
1✔
80

81
        mountPermissions := d.mountPermissions
5✔
82
        context := req.GetVolumeContext()
5✔
83
        if context != nil {
7✔
84
                if strings.EqualFold(context[ephemeralField], trueValue) {
2✔
85
                        setKeyValueInMap(context, secretNamespaceField, context[podNamespaceField])
×
86
                        if !d.allowInlineVolumeKeyAccessWithIdentity {
×
87
                                // only get storage account from secret
×
88
                                setKeyValueInMap(context, getAccountKeyFromSecretField, trueValue)
×
89
                                setKeyValueInMap(context, storageAccountField, "")
×
90
                        }
×
91
                        klog.V(2).Infof("NodePublishVolume: ephemeral volume(%s) mount on %s, VolumeContext: %v", volumeID, target, context)
×
92
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
93
                                StagingTargetPath: target,
×
94
                                VolumeContext:     context,
×
95
                                VolumeCapability:  volCap,
×
96
                                VolumeId:          volumeID,
×
97
                        })
×
98
                        return &csi.NodePublishVolumeResponse{}, err
×
99
                }
100

101
                if perm := getValueInMap(context, mountPermissionsField); perm != "" {
4✔
102
                        var err error
2✔
103
                        if mountPermissions, err = strconv.ParseUint(perm, 8, 32); err != nil {
3✔
104
                                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", perm))
1✔
105
                        }
1✔
106
                }
107
        }
108

109
        source := req.GetStagingTargetPath()
4✔
110
        if len(source) == 0 {
5✔
111
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
112
        }
1✔
113

114
        mountOptions := []string{"bind"}
3✔
115
        if req.GetReadonly() {
6✔
116
                mountOptions = append(mountOptions, "ro")
3✔
117
        }
3✔
118

119
        mnt, err := d.ensureMountPoint(target, fs.FileMode(mountPermissions))
3✔
120
        if err != nil {
4✔
121
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", target, err)
1✔
122
        }
1✔
123
        if mnt {
2✔
124
                klog.V(2).Infof("NodePublishVolume: volume %s is already mounted on %s", volumeID, target)
×
125
                return &csi.NodePublishVolumeResponse{}, nil
×
126
        }
×
127

128
        klog.V(2).Infof("NodePublishVolume: volume %s mounting %s at %s with mountOptions: %v", volumeID, source, target, mountOptions)
2✔
129
        if d.enableBlobMockMount {
2✔
130
                klog.Warningf("NodePublishVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
×
131
                if err := volumehelper.MakeDir(target, os.FileMode(mountPermissions)); err != nil {
×
132
                        klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
×
133
                        return nil, status.Errorf(codes.Internal, err.Error())
×
134
                }
×
135
                return &csi.NodePublishVolumeResponse{}, nil
×
136
        }
137

138
        if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
2✔
139
                if removeErr := os.Remove(target); removeErr != nil {
×
140
                        return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
×
141
                }
×
142
                return nil, status.Errorf(codes.Internal, "Could not mount %q at %q: %v", source, target, err)
×
143
        }
144
        klog.V(2).Infof("NodePublishVolume: volume %s mount %s at %s successfully", volumeID, source, target)
2✔
145

2✔
146
        return &csi.NodePublishVolumeResponse{}, nil
2✔
147
}
148

149
func (d *Driver) mountBlobfuseWithProxy(args, protocol string, authEnv []string) (string, error) {
1✔
150
        var resp *mount_azure_blob.MountAzureBlobResponse
1✔
151
        var output string
1✔
152
        connectionTimout := time.Duration(d.blobfuseProxyConnTimout) * time.Second
1✔
153
        ctx, cancel := context.WithTimeout(context.Background(), connectionTimout)
1✔
154
        defer cancel()
1✔
155
        klog.V(2).Infof("start connecting to blobfuse proxy, protocol: %s, args: %s", protocol, args)
1✔
156
        conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
1✔
157
        if err == nil {
1✔
158
                mountClient := NewMountClient(conn)
×
159
                mountreq := mount_azure_blob.MountAzureBlobRequest{
×
160
                        MountArgs: args,
×
161
                        Protocol:  protocol,
×
162
                        AuthEnv:   authEnv,
×
163
                }
×
164
                klog.V(2).Infof("begin to mount with blobfuse proxy, protocol: %s, args: %s", protocol, args)
×
165
                resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
×
166
                if err != nil {
×
167
                        klog.Error("GRPC call returned with an error:", err)
×
168
                }
×
169
                output = resp.GetOutput()
×
170
        }
171
        return output, err
1✔
172
}
173

174
func (d *Driver) mountBlobfuseInsideDriver(args string, protocol string, authEnv []string) (string, error) {
1✔
175
        var cmd *exec.Cmd
1✔
176

1✔
177
        args = volumehelper.TrimDuplicatedSpace(args)
1✔
178

1✔
179
        mountLog := "mount inside driver with"
1✔
180
        if protocol == Fuse2 {
1✔
181
                mountLog += " v2"
×
182
                args = "mount " + args
×
183
                cmd = exec.Command("blobfuse2", strings.Split(args, " ")...)
×
184
        } else {
1✔
185
                mountLog += " v1"
1✔
186
                cmd = exec.Command("blobfuse", strings.Split(args, " ")...)
1✔
187
        }
1✔
188
        klog.V(2).Infof("%s, protocol: %s, args: %s", mountLog, protocol, args)
1✔
189

1✔
190
        cmd.Env = append(os.Environ(), authEnv...)
1✔
191
        output, err := cmd.CombinedOutput()
1✔
192
        klog.V(2).Infof("mount output: %s\n", string(output))
1✔
193

1✔
194
        return string(output), err
1✔
195
}
196

197
// NodeUnpublishVolume unmount the volume from the target path
198
func (d *Driver) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
3✔
199
        volumeID := req.GetVolumeId()
3✔
200
        if len(volumeID) == 0 {
4✔
201
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
202
        }
1✔
203
        targetPath := req.GetTargetPath()
2✔
204
        if len(targetPath) == 0 {
3✔
205
                return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
1✔
206
        }
1✔
207

208
        klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
1✔
209
        err := mount.CleanupMountPoint(targetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
210
        if err != nil {
1✔
211
                return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", targetPath, err)
×
212
        }
×
213
        klog.V(2).Infof("NodeUnpublishVolume: unmount volume %s on %s successfully", volumeID, targetPath)
1✔
214

1✔
215
        return &csi.NodeUnpublishVolumeResponse{}, nil
1✔
216
}
217

218
// NodeStageVolume mount the volume to a staging path
219
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
10✔
220
        volumeID := req.GetVolumeId()
10✔
221
        if len(volumeID) == 0 {
11✔
222
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
223
        }
1✔
224
        targetPath := req.GetStagingTargetPath()
9✔
225
        if len(targetPath) == 0 {
10✔
226
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
227
        }
1✔
228
        volumeCapability := req.GetVolumeCapability()
8✔
229
        if volumeCapability == nil {
9✔
230
                return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
1✔
231
        }
1✔
232

233
        lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
7✔
234
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
9✔
235
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
2✔
236
        }
2✔
237
        defer d.volumeLocks.Release(lockKey)
5✔
238

5✔
239
        mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
5✔
240
        volumeMountGroup := req.GetVolumeCapability().GetMount().GetVolumeMountGroup()
5✔
241
        attrib := req.GetVolumeContext()
5✔
242
        secrets := req.GetSecrets()
5✔
243

5✔
244
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_stage_volume", d.cloud.ResourceGroup, "", d.Name)
5✔
245
        isOperationSucceeded := false
5✔
246
        defer func() {
10✔
247
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
5✔
248
        }()
5✔
249

250
        var serverAddress, storageEndpointSuffix, protocol, ephemeralVolMountOptions string
5✔
251
        var ephemeralVol, isHnsEnabled bool
5✔
252

5✔
253
        containerNameReplaceMap := map[string]string{}
5✔
254

5✔
255
        fsGroupChangePolicy := d.fsGroupChangePolicy
5✔
256

5✔
257
        mountPermissions := d.mountPermissions
5✔
258
        performChmodOp := (mountPermissions > 0)
5✔
259
        for k, v := range attrib {
12✔
260
                switch strings.ToLower(k) {
7✔
261
                case serverNameField:
×
262
                        serverAddress = v
×
263
                case protocolField:
2✔
264
                        protocol = v
2✔
265
                case storageEndpointSuffixField:
×
266
                        storageEndpointSuffix = v
×
267
                case ephemeralField:
×
268
                        ephemeralVol = strings.EqualFold(v, trueValue)
×
269
                case mountOptionsField:
×
270
                        ephemeralVolMountOptions = v
×
271
                case isHnsEnabledField:
×
272
                        isHnsEnabled = strings.EqualFold(v, trueValue)
×
273
                case pvcNamespaceKey:
×
274
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
275
                case pvcNameKey:
×
276
                        containerNameReplaceMap[pvcNameMetadata] = v
×
277
                case pvNameKey:
×
278
                        containerNameReplaceMap[pvNameMetadata] = v
×
279
                case mountPermissionsField:
4✔
280
                        if v != "" {
8✔
281
                                var err error
4✔
282
                                var perm uint64
4✔
283
                                if perm, err = strconv.ParseUint(v, 8, 32); err != nil {
5✔
284
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", v))
1✔
285
                                }
1✔
286
                                if perm == 0 {
3✔
287
                                        performChmodOp = false
×
288
                                } else {
3✔
289
                                        mountPermissions = perm
3✔
290
                                }
3✔
291
                        }
292
                case fsGroupChangePolicyField:
1✔
293
                        fsGroupChangePolicy = v
1✔
294
                }
295
        }
296

297
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
5✔
298
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
299
        }
1✔
300

301
        mnt, err := d.ensureMountPoint(targetPath, fs.FileMode(mountPermissions))
3✔
302
        if err != nil {
4✔
303
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", targetPath, err)
1✔
304
        }
1✔
305
        if mnt {
2✔
306
                klog.V(2).Infof("NodeStageVolume: volume %s is already mounted on %s", volumeID, targetPath)
×
307
                return &csi.NodeStageVolumeResponse{}, nil
×
308
        }
×
309

310
        _, accountName, _, containerName, authEnv, err := d.GetAuthEnv(ctx, volumeID, protocol, attrib, secrets)
2✔
311
        if err != nil {
2✔
312
                return nil, status.Errorf(codes.Internal, err.Error())
×
313
        }
×
314

315
        // replace pv/pvc name namespace metadata in subDir
316
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
2✔
317

2✔
318
        if strings.TrimSpace(storageEndpointSuffix) == "" {
4✔
319
                if d.cloud.Environment.StorageEndpointSuffix != "" {
2✔
320
                        storageEndpointSuffix = d.cloud.Environment.StorageEndpointSuffix
×
321
                } else {
2✔
322
                        storageEndpointSuffix = storage.DefaultBaseURL
2✔
323
                }
2✔
324
        }
325

326
        if strings.TrimSpace(serverAddress) == "" {
4✔
327
                // server address is "accountname.blob.core.windows.net" by default
2✔
328
                serverAddress = fmt.Sprintf("%s.blob.%s", accountName, storageEndpointSuffix)
2✔
329
        }
2✔
330

331
        if isNFSProtocol(protocol) {
3✔
332
                klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\ncontext %v\nmountflags %v\nserverAddress %v",
1✔
333
                        targetPath, protocol, volumeID, attrib, mountFlags, serverAddress)
1✔
334

1✔
335
                mountType := AZNFS
1✔
336
                if !d.enableAznfsMount {
2✔
337
                        mountType = NFS
1✔
338
                }
1✔
339

340
                source := fmt.Sprintf("%s:/%s/%s", serverAddress, accountName, containerName)
1✔
341
                mountOptions := util.JoinMountOptions(mountFlags, []string{"sec=sys,vers=3,nolock"})
1✔
342
                if err := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
2✔
343
                        return true, d.mounter.MountSensitive(source, targetPath, mountType, mountOptions, []string{})
1✔
344
                }); err != nil {
1✔
345
                        var helpLinkMsg string
×
346
                        if d.appendMountErrorHelpLink {
×
347
                                helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
348
                        }
×
349
                        return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with %v%s", volumeID, source, targetPath, err, helpLinkMsg))
×
350
                }
351

352
                if performChmodOp {
1✔
353
                        if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {
×
354
                                return nil, status.Error(codes.Internal, err.Error())
×
355
                        }
×
356
                } else {
1✔
357
                        klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
1✔
358
                }
1✔
359

360
                if volumeMountGroup != "" && fsGroupChangePolicy != FSGroupChangeNone {
1✔
361
                        klog.V(2).Infof("set gid of volume(%s) as %s using fsGroupChangePolicy(%s)", volumeID, volumeMountGroup, fsGroupChangePolicy)
×
362
                        if err := volumehelper.SetVolumeOwnership(targetPath, volumeMountGroup, fsGroupChangePolicy); err != nil {
×
363
                                return nil, status.Error(codes.Internal, fmt.Sprintf("SetVolumeOwnership with volume(%s) on %s failed with %v", volumeID, targetPath, err))
×
364
                        }
×
365
                }
366

367
                isOperationSucceeded = true
1✔
368
                klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
1✔
369
                return &csi.NodeStageVolumeResponse{}, nil
1✔
370
        }
371

372
        // Get mountOptions that the volume will be formatted and mounted with
373
        mountOptions := mountFlags
1✔
374
        if ephemeralVol {
1✔
375
                mountOptions = util.JoinMountOptions(mountOptions, strings.Split(ephemeralVolMountOptions, ","))
×
376
        }
×
377
        if isHnsEnabled {
1✔
378
                mountOptions = util.JoinMountOptions(mountOptions, []string{"--use-adls=true"})
×
379
        }
×
380

381
        if !checkGidPresentInMountFlags(mountFlags) && volumeMountGroup != "" {
1✔
382
                klog.V(2).Infof("append volumeMountGroup %s", volumeMountGroup)
×
383
                mountOptions = append(mountOptions, fmt.Sprintf("-o gid=%s", volumeMountGroup))
×
384
        }
×
385

386
        tmpPath := fmt.Sprintf("%s/%s", "/mnt", volumeID)
1✔
387
        if d.appendTimeStampInCacheDir {
1✔
388
                tmpPath += fmt.Sprintf("#%d", time.Now().Unix())
×
389
        }
×
390
        mountOptions = appendDefaultMountOptions(mountOptions, tmpPath, containerName)
1✔
391

1✔
392
        args := targetPath
1✔
393
        for _, opt := range mountOptions {
7✔
394
                args = args + " " + opt
6✔
395
        }
6✔
396

397
        klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\ncontext %v\nmountflags %v mountOptions %v volumeMountGroup %s\nargs %v\nserverAddress %v",
1✔
398
                targetPath, protocol, volumeID, attrib, mountFlags, mountOptions, volumeMountGroup, args, serverAddress)
1✔
399

1✔
400
        authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
1✔
401
        if d.enableBlobMockMount {
2✔
402
                klog.Warningf("NodeStageVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
1✔
403
                if err := volumehelper.MakeDir(targetPath, os.FileMode(mountPermissions)); err != nil {
1✔
404
                        klog.Errorf("MakeDir failed on target: %s (%v)", targetPath, err)
×
405
                        return nil, status.Errorf(codes.Internal, err.Error())
×
406
                }
×
407
                return &csi.NodeStageVolumeResponse{}, nil
1✔
408
        }
409

410
        var output string
×
411
        if d.enableBlobfuseProxy {
×
412
                output, err = d.mountBlobfuseWithProxy(args, protocol, authEnv)
×
413
        } else {
×
414
                output, err = d.mountBlobfuseInsideDriver(args, protocol, authEnv)
×
415
        }
×
416

417
        if err != nil {
×
418
                var helpLinkMsg string
×
419
                if d.appendMountErrorHelpLink {
×
420
                        helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
421
                }
×
422
                err = status.Errorf(codes.Internal, "Mount failed with error: %v, output: %v%s", err, output, helpLinkMsg)
×
423
                klog.Errorf("%v", err)
×
424
                notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
425
                if mntErr != nil {
×
426
                        klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
427
                        return nil, err
×
428
                }
×
429
                if !notMnt {
×
430
                        if mntErr = d.mounter.Unmount(targetPath); mntErr != nil {
×
431
                                klog.Errorf("Failed to unmount: %v", mntErr)
×
432
                                return nil, err
×
433
                        }
×
434
                        notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
435
                        if mntErr != nil {
×
436
                                klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
437
                                return nil, err
×
438
                        }
×
439
                        if !notMnt {
×
440
                                // This is very odd, we don't expect it.  We'll try again next sync loop.
×
441
                                klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", targetPath)
×
442
                                return nil, err
×
443
                        }
×
444
                }
445
                os.Remove(targetPath)
×
446
                return nil, err
×
447
        }
448

449
        // wait a few seconds to make sure blobfuse mount is successful
450
        // please refer to https://github.com/Azure/azure-storage-fuse/pull/1088 for more details
451
        if err := waitForMount(targetPath, waitForMountInterval, waitForMountTimeout); err != nil {
×
452
                return nil, fmt.Errorf("failed to wait for mount: %w", err)
×
453
        }
×
454

455
        klog.V(2).Infof("volume(%s) mount on %q succeeded", volumeID, targetPath)
×
456
        return &csi.NodeStageVolumeResponse{}, nil
×
457
}
458

459
// NodeUnstageVolume unmount the volume from the staging path
460
func (d *Driver) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
3✔
461
        volumeID := req.GetVolumeId()
3✔
462
        if len(volumeID) == 0 {
4✔
463
                return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
1✔
464
        }
1✔
465

466
        stagingTargetPath := req.GetStagingTargetPath()
2✔
467
        if len(stagingTargetPath) == 0 {
3✔
468
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
469
        }
1✔
470

471
        lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
1✔
472
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
1✔
473
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
474
        }
×
475
        defer d.volumeLocks.Release(lockKey)
1✔
476

1✔
477
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_unstage_volume", d.cloud.ResourceGroup, "", d.Name)
1✔
478
        isOperationSucceeded := false
1✔
479
        defer func() {
2✔
480
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
1✔
481
        }()
1✔
482

483
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath)
1✔
484
        err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
485
        if err != nil {
1✔
486
                return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
×
487
        }
×
488
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmount on %s successfully", volumeID, stagingTargetPath)
1✔
489

1✔
490
        isOperationSucceeded = true
1✔
491
        return &csi.NodeUnstageVolumeResponse{}, nil
1✔
492
}
493

494
// NodeGetCapabilities return the capabilities of the Node plugin
495
func (d *Driver) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
1✔
496
        return &csi.NodeGetCapabilitiesResponse{
1✔
497
                Capabilities: d.NSCap,
1✔
498
        }, nil
1✔
499
}
1✔
500

501
// NodeGetInfo return info of the node on which this plugin is running
502
func (d *Driver) NodeGetInfo(_ context.Context, _ *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
1✔
503
        return &csi.NodeGetInfoResponse{
1✔
504
                NodeId: d.NodeID,
1✔
505
        }, nil
1✔
506
}
1✔
507

508
// NodeExpandVolume node expand volume
509
func (d *Driver) NodeExpandVolume(_ context.Context, _ *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
1✔
510
        return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not yet implemented")
1✔
511
}
1✔
512

513
// NodeGetVolumeStats get volume stats
514
func (d *Driver) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
4✔
515
        if len(req.VolumeId) == 0 {
5✔
516
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
1✔
517
        }
1✔
518
        if len(req.VolumePath) == 0 {
4✔
519
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
1✔
520
        }
1✔
521

522
        // check if the volume stats is cached
523
        cache, err := d.volStatsCache.Get(req.VolumeId, azcache.CacheReadTypeDefault)
2✔
524
        if err != nil {
2✔
525
                return nil, status.Errorf(codes.Internal, err.Error())
×
526
        }
×
527
        if cache != nil {
2✔
528
                resp := cache.(csi.NodeGetVolumeStatsResponse)
×
529
                klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is cached", req.VolumeId, req.VolumePath)
×
530
                return &resp, nil
×
531
        }
×
532

533
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_get_volume_stats", d.cloud.ResourceGroup, "", d.Name)
2✔
534
        isOperationSucceeded := false
2✔
535
        defer func() {
4✔
536
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, req.VolumeId)
2✔
537
        }()
2✔
538

539
        if _, err := os.Lstat(req.VolumePath); err != nil {
3✔
540
                if os.IsNotExist(err) {
2✔
541
                        return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
1✔
542
                }
1✔
543
                return nil, status.Errorf(codes.Internal, "failed to stat file %s: %v", req.VolumePath, err)
×
544
        }
545

546
        klog.V(6).Infof("NodeGetVolumeStats: begin to get VolumeStats on volume %s path %s", req.VolumeId, req.VolumePath)
1✔
547

1✔
548
        volumeMetrics, err := volume.NewMetricsStatFS(req.VolumePath).GetMetrics()
1✔
549
        if err != nil {
1✔
550
                return nil, status.Errorf(codes.Internal, "failed to get metrics: %v", err)
×
551
        }
×
552

553
        available, ok := volumeMetrics.Available.AsInt64()
1✔
554
        if !ok {
1✔
555
                return nil, status.Errorf(codes.Internal, "failed to transform volume available size(%v)", volumeMetrics.Available)
×
556
        }
×
557
        capacity, ok := volumeMetrics.Capacity.AsInt64()
1✔
558
        if !ok {
1✔
559
                return nil, status.Errorf(codes.Internal, "failed to transform volume capacity size(%v)", volumeMetrics.Capacity)
×
560
        }
×
561
        used, ok := volumeMetrics.Used.AsInt64()
1✔
562
        if !ok {
1✔
563
                return nil, status.Errorf(codes.Internal, "failed to transform volume used size(%v)", volumeMetrics.Used)
×
564
        }
×
565

566
        inodesFree, ok := volumeMetrics.InodesFree.AsInt64()
1✔
567
        if !ok {
1✔
568
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes free(%v)", volumeMetrics.InodesFree)
×
569
        }
×
570
        inodes, ok := volumeMetrics.Inodes.AsInt64()
1✔
571
        if !ok {
1✔
572
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes(%v)", volumeMetrics.Inodes)
×
573
        }
×
574
        inodesUsed, ok := volumeMetrics.InodesUsed.AsInt64()
1✔
575
        if !ok {
1✔
576
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes used(%v)", volumeMetrics.InodesUsed)
×
577
        }
×
578

579
        resp := &csi.NodeGetVolumeStatsResponse{
1✔
580
                Usage: []*csi.VolumeUsage{
1✔
581
                        {
1✔
582
                                Unit:      csi.VolumeUsage_BYTES,
1✔
583
                                Available: available,
1✔
584
                                Total:     capacity,
1✔
585
                                Used:      used,
1✔
586
                        },
1✔
587
                        {
1✔
588
                                Unit:      csi.VolumeUsage_INODES,
1✔
589
                                Available: inodesFree,
1✔
590
                                Total:     inodes,
1✔
591
                                Used:      inodesUsed,
1✔
592
                        },
1✔
593
                },
1✔
594
        }
1✔
595

1✔
596
        isOperationSucceeded = true
1✔
597
        klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is %v", req.VolumeId, req.VolumePath, resp)
1✔
598
        // cache the volume stats per volume
1✔
599
        d.volStatsCache.Set(req.VolumeId, *resp)
1✔
600
        return resp, nil
1✔
601
}
602

603
// ensureMountPoint: create mount point if not exists
604
// return <true, nil> if it's already a mounted point otherwise return <false, nil>
605
func (d *Driver) ensureMountPoint(target string, perm os.FileMode) (bool, error) {
11✔
606
        notMnt, err := d.mounter.IsLikelyNotMountPoint(target)
11✔
607
        if err != nil && !os.IsNotExist(err) {
13✔
608
                if IsCorruptedDir(target) {
2✔
609
                        notMnt = false
×
610
                        klog.Warningf("detected corrupted mount for targetPath [%s]", target)
×
611
                } else {
2✔
612
                        return !notMnt, err
2✔
613
                }
2✔
614
        }
615

616
        // Check all the mountpoints in case IsLikelyNotMountPoint
617
        // cannot handle --bind mount
618
        mountList, err := d.mounter.List()
9✔
619
        if err != nil {
9✔
620
                return !notMnt, err
×
621
        }
×
622

623
        targetAbs, err := filepath.Abs(target)
9✔
624
        if err != nil {
9✔
625
                return !notMnt, err
×
626
        }
×
627

628
        for _, mountPoint := range mountList {
9✔
629
                if mountPoint.Path == targetAbs {
×
630
                        notMnt = false
×
631
                        break
×
632
                }
633
        }
634

635
        if !notMnt {
11✔
636
                // testing original mount point, make sure the mount link is valid
2✔
637
                _, err := os.ReadDir(target)
2✔
638
                if err == nil {
3✔
639
                        klog.V(2).Infof("already mounted to target %s", target)
1✔
640
                        return !notMnt, nil
1✔
641
                }
1✔
642
                // mount link is invalid, now unmount and remount later
643
                klog.Warningf("ReadDir %s failed with %v, unmount this directory", target, err)
1✔
644
                if err := d.mounter.Unmount(target); err != nil {
1✔
645
                        klog.Errorf("Unmount directory %s failed with %v", target, err)
×
646
                        return !notMnt, err
×
647
                }
×
648
                notMnt = true
1✔
649
                return !notMnt, err
1✔
650
        }
651
        if err := volumehelper.MakeDir(target, perm); err != nil {
9✔
652
                klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
2✔
653
                return !notMnt, err
2✔
654
        }
2✔
655
        return !notMnt, nil
5✔
656
}
657

658
func waitForMount(path string, intervel, timeout time.Duration) error {
2✔
659
        timeAfter := time.After(timeout)
2✔
660
        timeTick := time.Tick(intervel)
2✔
661

2✔
662
        for {
12✔
663
                select {
10✔
664
                case <-timeTick:
9✔
665
                        notMount, err := mount.New("").IsLikelyNotMountPoint(path)
9✔
666
                        if err != nil {
10✔
667
                                return err
1✔
668
                        }
1✔
669
                        if !notMount {
8✔
670
                                klog.V(2).Infof("blobfuse mount at %s success", path)
×
671
                                return nil
×
672
                        }
×
673
                case <-timeAfter:
1✔
674
                        return fmt.Errorf("timeout waiting for mount %s", path)
1✔
675
                }
676
        }
677
}
678

679
func checkGidPresentInMountFlags(mountFlags []string) bool {
4✔
680
        for _, mountFlag := range mountFlags {
6✔
681
                if strings.Contains(mountFlag, "gid=") {
4✔
682
                        return true
2✔
683
                }
2✔
684
        }
685
        return false
2✔
686
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc