• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

kubernetes-sigs / blob-csi-driver / 8603508478

08 Apr 2024 04:26PM UTC coverage: 74.428%. Remained the same
8603508478

Pull #1339

github

web-flow
chore(deps): bump golang.org/x/net from 0.22.0 to 0.24.0

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.22.0 to 0.24.0.
- [Commits](https://github.com/golang/net/compare/v0.22.0...v0.24.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #1339: chore(deps): bump golang.org/x/net from 0.22.0 to 0.24.0

2212 of 2972 relevant lines covered (74.43%)

7.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.73
/pkg/blob/nodeserver.go
1
/*
2
Copyright 2017 The Kubernetes Authors.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package blob
18

19
import (
20
        "fmt"
21
        "io/fs"
22
        "os"
23
        "os/exec"
24
        "path/filepath"
25
        "strconv"
26
        "strings"
27
        "time"
28

29
        volumehelper "sigs.k8s.io/blob-csi-driver/pkg/util"
30
        azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
31
        "sigs.k8s.io/cloud-provider-azure/pkg/metrics"
32

33
        "github.com/container-storage-interface/spec/lib/go/csi"
34

35
        "k8s.io/klog/v2"
36
        "k8s.io/kubernetes/pkg/volume"
37
        "k8s.io/kubernetes/pkg/volume/util"
38
        mount "k8s.io/mount-utils"
39

40
        "google.golang.org/grpc/codes"
41
        "google.golang.org/grpc/status"
42

43
        "golang.org/x/net/context"
44
        "google.golang.org/grpc"
45
        mount_azure_blob "sigs.k8s.io/blob-csi-driver/pkg/blobfuse-proxy/pb"
46
)
47

48
const (
49
        waitForMountInterval = 20 * time.Millisecond
50
        waitForMountTimeout  = 60 * time.Second
51
)
52

53
type MountClient struct {
54
        service mount_azure_blob.MountServiceClient
55
}
56

57
// NewMountClient returns a new mount client
58
func NewMountClient(cc *grpc.ClientConn) *MountClient {
1✔
59
        service := mount_azure_blob.NewMountServiceClient(cc)
1✔
60
        return &MountClient{service}
1✔
61
}
1✔
62

63
// NodePublishVolume mount the volume from staging to target path
64
func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
8✔
65
        volCap := req.GetVolumeCapability()
8✔
66
        if volCap == nil {
9✔
67
                return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
1✔
68
        }
1✔
69
        volumeID := req.GetVolumeId()
7✔
70
        if len(req.GetVolumeId()) == 0 {
8✔
71
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
72
        }
1✔
73

74
        target := req.GetTargetPath()
6✔
75
        if len(target) == 0 {
7✔
76
                return nil, status.Error(codes.InvalidArgument, "Target path not provided")
1✔
77
        }
1✔
78

79
        mountPermissions := d.mountPermissions
5✔
80
        context := req.GetVolumeContext()
5✔
81
        if context != nil {
7✔
82
                // token request
2✔
83
                if context[serviceAccountTokenField] != "" && getValueInMap(context, clientIDField) != "" {
2✔
84
                        klog.V(2).Infof("NodePublishVolume: volume(%s) mount on %s with service account token, clientID: %s", volumeID, target, getValueInMap(context, clientIDField))
×
85
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
86
                                StagingTargetPath: target,
×
87
                                VolumeContext:     context,
×
88
                                VolumeCapability:  volCap,
×
89
                                VolumeId:          volumeID,
×
90
                        })
×
91
                        return &csi.NodePublishVolumeResponse{}, err
×
92
                }
×
93

94
                // ephemeral volume
95
                if strings.EqualFold(context[ephemeralField], trueValue) {
2✔
96
                        setKeyValueInMap(context, secretNamespaceField, context[podNamespaceField])
×
97
                        if !d.allowInlineVolumeKeyAccessWithIdentity {
×
98
                                // only get storage account from secret
×
99
                                setKeyValueInMap(context, getAccountKeyFromSecretField, trueValue)
×
100
                                setKeyValueInMap(context, storageAccountField, "")
×
101
                        }
×
102
                        klog.V(2).Infof("NodePublishVolume: ephemeral volume(%s) mount on %s, VolumeContext: %v", volumeID, target, context)
×
103
                        _, err := d.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
×
104
                                StagingTargetPath: target,
×
105
                                VolumeContext:     context,
×
106
                                VolumeCapability:  volCap,
×
107
                                VolumeId:          volumeID,
×
108
                        })
×
109
                        return &csi.NodePublishVolumeResponse{}, err
×
110
                }
111

112
                if perm := getValueInMap(context, mountPermissionsField); perm != "" {
4✔
113
                        var err error
2✔
114
                        if mountPermissions, err = strconv.ParseUint(perm, 8, 32); err != nil {
3✔
115
                                return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", perm))
1✔
116
                        }
1✔
117
                }
118
        }
119

120
        source := req.GetStagingTargetPath()
4✔
121
        if len(source) == 0 {
5✔
122
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
123
        }
1✔
124

125
        mountOptions := []string{"bind"}
3✔
126
        if req.GetReadonly() {
6✔
127
                mountOptions = append(mountOptions, "ro")
3✔
128
        }
3✔
129

130
        mnt, err := d.ensureMountPoint(target, fs.FileMode(mountPermissions))
3✔
131
        if err != nil {
4✔
132
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", target, err)
1✔
133
        }
1✔
134
        if mnt {
2✔
135
                klog.V(2).Infof("NodePublishVolume: volume %s is already mounted on %s", volumeID, target)
×
136
                return &csi.NodePublishVolumeResponse{}, nil
×
137
        }
×
138

139
        klog.V(2).Infof("NodePublishVolume: volume %s mounting %s at %s with mountOptions: %v", volumeID, source, target, mountOptions)
2✔
140
        if d.enableBlobMockMount {
2✔
141
                klog.Warningf("NodePublishVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
×
142
                if err := volumehelper.MakeDir(target, os.FileMode(mountPermissions)); err != nil {
×
143
                        klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
×
144
                        return nil, status.Errorf(codes.Internal, err.Error())
×
145
                }
×
146
                return &csi.NodePublishVolumeResponse{}, nil
×
147
        }
148

149
        if err := d.mounter.Mount(source, target, "", mountOptions); err != nil {
2✔
150
                if removeErr := os.Remove(target); removeErr != nil {
×
151
                        return nil, status.Errorf(codes.Internal, "Could not remove mount target %q: %v", target, removeErr)
×
152
                }
×
153
                return nil, status.Errorf(codes.Internal, "Could not mount %q at %q: %v", source, target, err)
×
154
        }
155
        klog.V(2).Infof("NodePublishVolume: volume %s mount %s at %s successfully", volumeID, source, target)
2✔
156

2✔
157
        return &csi.NodePublishVolumeResponse{}, nil
2✔
158
}
159

160
func (d *Driver) mountBlobfuseWithProxy(args, protocol string, authEnv []string) (string, error) {
1✔
161
        var resp *mount_azure_blob.MountAzureBlobResponse
1✔
162
        var output string
1✔
163
        connectionTimout := time.Duration(d.blobfuseProxyConnTimout) * time.Second
1✔
164
        ctx, cancel := context.WithTimeout(context.Background(), connectionTimout)
1✔
165
        defer cancel()
1✔
166
        klog.V(2).Infof("start connecting to blobfuse proxy, protocol: %s, args: %s", protocol, args)
1✔
167
        conn, err := grpc.DialContext(ctx, d.blobfuseProxyEndpoint, grpc.WithInsecure(), grpc.WithBlock())
1✔
168
        if err == nil {
1✔
169
                mountClient := NewMountClient(conn)
×
170
                mountreq := mount_azure_blob.MountAzureBlobRequest{
×
171
                        MountArgs: args,
×
172
                        Protocol:  protocol,
×
173
                        AuthEnv:   authEnv,
×
174
                }
×
175
                klog.V(2).Infof("begin to mount with blobfuse proxy, protocol: %s, args: %s", protocol, args)
×
176
                resp, err = mountClient.service.MountAzureBlob(context.TODO(), &mountreq)
×
177
                if err != nil {
×
178
                        klog.Error("GRPC call returned with an error:", err)
×
179
                }
×
180
                output = resp.GetOutput()
×
181
        }
182
        return output, err
1✔
183
}
184

185
func (d *Driver) mountBlobfuseInsideDriver(args string, protocol string, authEnv []string) (string, error) {
1✔
186
        var cmd *exec.Cmd
1✔
187

1✔
188
        args = volumehelper.TrimDuplicatedSpace(args)
1✔
189

1✔
190
        mountLog := "mount inside driver with"
1✔
191
        if protocol == Fuse2 {
1✔
192
                mountLog += " v2"
×
193
                args = "mount " + args
×
194
                cmd = exec.Command("blobfuse2", strings.Split(args, " ")...)
×
195
        } else {
1✔
196
                mountLog += " v1"
1✔
197
                cmd = exec.Command("blobfuse", strings.Split(args, " ")...)
1✔
198
        }
1✔
199
        klog.V(2).Infof("%s, protocol: %s, args: %s", mountLog, protocol, args)
1✔
200

1✔
201
        cmd.Env = append(os.Environ(), authEnv...)
1✔
202
        output, err := cmd.CombinedOutput()
1✔
203
        klog.V(2).Infof("mount output: %s\n", string(output))
1✔
204

1✔
205
        return string(output), err
1✔
206
}
207

208
// NodeUnpublishVolume unmount the volume from the target path
209
func (d *Driver) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
3✔
210
        volumeID := req.GetVolumeId()
3✔
211
        if len(volumeID) == 0 {
4✔
212
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
213
        }
1✔
214
        targetPath := req.GetTargetPath()
2✔
215
        if len(targetPath) == 0 {
3✔
216
                return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
1✔
217
        }
1✔
218

219
        klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
1✔
220
        err := mount.CleanupMountPoint(targetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
221
        if err != nil {
1✔
222
                return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", targetPath, err)
×
223
        }
×
224
        klog.V(2).Infof("NodeUnpublishVolume: unmount volume %s on %s successfully", volumeID, targetPath)
1✔
225

1✔
226
        return &csi.NodeUnpublishVolumeResponse{}, nil
1✔
227
}
228

229
// NodeStageVolume mount the volume to a staging path
230
func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
10✔
231
        volumeID := req.GetVolumeId()
10✔
232
        if len(volumeID) == 0 {
11✔
233
                return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
1✔
234
        }
1✔
235
        targetPath := req.GetStagingTargetPath()
9✔
236
        if len(targetPath) == 0 {
10✔
237
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
238
        }
1✔
239
        volumeCapability := req.GetVolumeCapability()
8✔
240
        if volumeCapability == nil {
9✔
241
                return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
1✔
242
        }
1✔
243

244
        lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
7✔
245
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
9✔
246
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
2✔
247
        }
2✔
248
        defer d.volumeLocks.Release(lockKey)
5✔
249

5✔
250
        mountFlags := req.GetVolumeCapability().GetMount().GetMountFlags()
5✔
251
        volumeMountGroup := req.GetVolumeCapability().GetMount().GetVolumeMountGroup()
5✔
252
        attrib := req.GetVolumeContext()
5✔
253
        secrets := req.GetSecrets()
5✔
254

5✔
255
        if getValueInMap(attrib, clientIDField) != "" && attrib[serviceAccountTokenField] == "" {
5✔
256
                klog.V(2).Infof("Skip NodeStageVolume for volume(%s) since clientID %s is provided but service account token is empty", volumeID, getValueInMap(attrib, clientIDField))
×
257
                return &csi.NodeStageVolumeResponse{}, nil
×
258
        }
×
259

260
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_stage_volume", d.cloud.ResourceGroup, "", d.Name)
5✔
261
        isOperationSucceeded := false
5✔
262
        defer func() {
10✔
263
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
5✔
264
        }()
5✔
265

266
        var serverAddress, storageEndpointSuffix, protocol, ephemeralVolMountOptions string
5✔
267
        var ephemeralVol, isHnsEnabled bool
5✔
268

5✔
269
        containerNameReplaceMap := map[string]string{}
5✔
270

5✔
271
        fsGroupChangePolicy := d.fsGroupChangePolicy
5✔
272

5✔
273
        mountPermissions := d.mountPermissions
5✔
274
        performChmodOp := (mountPermissions > 0)
5✔
275
        for k, v := range attrib {
12✔
276
                switch strings.ToLower(k) {
7✔
277
                case serverNameField:
×
278
                        serverAddress = v
×
279
                case protocolField:
2✔
280
                        protocol = v
2✔
281
                case storageEndpointSuffixField:
×
282
                        storageEndpointSuffix = v
×
283
                case ephemeralField:
×
284
                        ephemeralVol = strings.EqualFold(v, trueValue)
×
285
                case mountOptionsField:
×
286
                        ephemeralVolMountOptions = v
×
287
                case isHnsEnabledField:
×
288
                        isHnsEnabled = strings.EqualFold(v, trueValue)
×
289
                case pvcNamespaceKey:
×
290
                        containerNameReplaceMap[pvcNamespaceMetadata] = v
×
291
                case pvcNameKey:
×
292
                        containerNameReplaceMap[pvcNameMetadata] = v
×
293
                case pvNameKey:
×
294
                        containerNameReplaceMap[pvNameMetadata] = v
×
295
                case mountPermissionsField:
4✔
296
                        if v != "" {
8✔
297
                                var err error
4✔
298
                                var perm uint64
4✔
299
                                if perm, err = strconv.ParseUint(v, 8, 32); err != nil {
5✔
300
                                        return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("invalid mountPermissions %s", v))
1✔
301
                                }
1✔
302
                                if perm == 0 {
3✔
303
                                        performChmodOp = false
×
304
                                } else {
3✔
305
                                        mountPermissions = perm
3✔
306
                                }
3✔
307
                        }
308
                case fsGroupChangePolicyField:
1✔
309
                        fsGroupChangePolicy = v
1✔
310
                }
311
        }
312

313
        if !isSupportedFSGroupChangePolicy(fsGroupChangePolicy) {
5✔
314
                return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList)
1✔
315
        }
1✔
316

317
        mnt, err := d.ensureMountPoint(targetPath, fs.FileMode(mountPermissions))
3✔
318
        if err != nil {
4✔
319
                return nil, status.Errorf(codes.Internal, "Could not mount target %q: %v", targetPath, err)
1✔
320
        }
1✔
321
        if mnt {
2✔
322
                klog.V(2).Infof("NodeStageVolume: volume %s is already mounted on %s", volumeID, targetPath)
×
323
                return &csi.NodeStageVolumeResponse{}, nil
×
324
        }
×
325

326
        _, accountName, _, containerName, authEnv, err := d.GetAuthEnv(ctx, volumeID, protocol, attrib, secrets)
2✔
327
        if err != nil {
2✔
328
                return nil, status.Errorf(codes.Internal, err.Error())
×
329
        }
×
330

331
        // replace pv/pvc name namespace metadata in subDir
332
        containerName = replaceWithMap(containerName, containerNameReplaceMap)
2✔
333

2✔
334
        if strings.TrimSpace(storageEndpointSuffix) == "" {
4✔
335
                storageEndpointSuffix = d.getStorageEndPointSuffix()
2✔
336
        }
2✔
337

338
        if strings.TrimSpace(serverAddress) == "" {
4✔
339
                // server address is "accountname.blob.core.windows.net" by default
2✔
340
                serverAddress = fmt.Sprintf("%s.blob.%s", accountName, storageEndpointSuffix)
2✔
341
        }
2✔
342

343
        if isNFSProtocol(protocol) {
3✔
344
                klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\ncontext %v\nmountflags %v\nserverAddress %v",
1✔
345
                        targetPath, protocol, volumeID, attrib, mountFlags, serverAddress)
1✔
346

1✔
347
                mountType := AZNFS
1✔
348
                if !d.enableAznfsMount || protocol == NFSv3 {
2✔
349
                        mountType = NFS
1✔
350
                }
1✔
351

352
                source := fmt.Sprintf("%s:/%s/%s", serverAddress, accountName, containerName)
1✔
353
                mountOptions := util.JoinMountOptions(mountFlags, []string{"sec=sys,vers=3,nolock"})
1✔
354
                execFunc := func() error { return d.mounter.MountSensitive(source, targetPath, mountType, mountOptions, []string{}) }
2✔
355
                timeoutFunc := func() error { return fmt.Errorf("time out") }
1✔
356
                if err := volumehelper.WaitUntilTimeout(2*time.Minute, execFunc, timeoutFunc); err != nil {
1✔
357
                        var helpLinkMsg string
×
358
                        if d.appendMountErrorHelpLink {
×
359
                                helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
360
                        }
×
361
                        return nil, status.Error(codes.Internal, fmt.Sprintf("volume(%s) mount %q on %q failed with %v%s", volumeID, source, targetPath, err, helpLinkMsg))
×
362
                }
363

364
                if performChmodOp {
1✔
365
                        if err := chmodIfPermissionMismatch(targetPath, os.FileMode(mountPermissions)); err != nil {
×
366
                                return nil, status.Error(codes.Internal, err.Error())
×
367
                        }
×
368
                } else {
1✔
369
                        klog.V(2).Infof("skip chmod on targetPath(%s) since mountPermissions is set as 0", targetPath)
1✔
370
                }
1✔
371

372
                if volumeMountGroup != "" && fsGroupChangePolicy != FSGroupChangeNone {
1✔
373
                        klog.V(2).Infof("set gid of volume(%s) as %s using fsGroupChangePolicy(%s)", volumeID, volumeMountGroup, fsGroupChangePolicy)
×
374
                        if err := volumehelper.SetVolumeOwnership(targetPath, volumeMountGroup, fsGroupChangePolicy); err != nil {
×
375
                                return nil, status.Error(codes.Internal, fmt.Sprintf("SetVolumeOwnership with volume(%s) on %s failed with %v", volumeID, targetPath, err))
×
376
                        }
×
377
                }
378

379
                isOperationSucceeded = true
1✔
380
                klog.V(2).Infof("volume(%s) mount %s on %s succeeded", volumeID, source, targetPath)
1✔
381
                return &csi.NodeStageVolumeResponse{}, nil
1✔
382
        }
383

384
        // Get mountOptions that the volume will be formatted and mounted with
385
        mountOptions := mountFlags
1✔
386
        if ephemeralVol {
1✔
387
                mountOptions = util.JoinMountOptions(mountOptions, strings.Split(ephemeralVolMountOptions, ","))
×
388
        }
×
389
        if isHnsEnabled {
1✔
390
                mountOptions = util.JoinMountOptions(mountOptions, []string{"--use-adls=true"})
×
391
        }
×
392

393
        if !checkGidPresentInMountFlags(mountFlags) && volumeMountGroup != "" {
1✔
394
                klog.V(2).Infof("append volumeMountGroup %s", volumeMountGroup)
×
395
                mountOptions = append(mountOptions, fmt.Sprintf("-o gid=%s", volumeMountGroup))
×
396
        }
×
397

398
        tmpPath := fmt.Sprintf("%s/%s", "/mnt", volumeID)
1✔
399
        if d.appendTimeStampInCacheDir {
1✔
400
                tmpPath += fmt.Sprintf("#%d", time.Now().Unix())
×
401
        }
×
402
        mountOptions = appendDefaultMountOptions(mountOptions, tmpPath, containerName)
1✔
403

1✔
404
        args := targetPath
1✔
405
        for _, opt := range mountOptions {
7✔
406
                args = args + " " + opt
6✔
407
        }
6✔
408

409
        klog.V(2).Infof("target %v\nprotocol %v\n\nvolumeId %v\ncontext %v\nmountflags %v mountOptions %v volumeMountGroup %s\nargs %v\nserverAddress %v",
1✔
410
                targetPath, protocol, volumeID, attrib, mountFlags, mountOptions, volumeMountGroup, args, serverAddress)
1✔
411

1✔
412
        authEnv = append(authEnv, "AZURE_STORAGE_ACCOUNT="+accountName, "AZURE_STORAGE_BLOB_ENDPOINT="+serverAddress)
1✔
413
        if d.enableBlobMockMount {
2✔
414
                klog.Warningf("NodeStageVolume: mock mount on volumeID(%s), this is only for TESTING!!!", volumeID)
1✔
415
                if err := volumehelper.MakeDir(targetPath, os.FileMode(mountPermissions)); err != nil {
1✔
416
                        klog.Errorf("MakeDir failed on target: %s (%v)", targetPath, err)
×
417
                        return nil, status.Errorf(codes.Internal, err.Error())
×
418
                }
×
419
                return &csi.NodeStageVolumeResponse{}, nil
1✔
420
        }
421

422
        var output string
×
423
        if d.enableBlobfuseProxy {
×
424
                output, err = d.mountBlobfuseWithProxy(args, protocol, authEnv)
×
425
        } else {
×
426
                output, err = d.mountBlobfuseInsideDriver(args, protocol, authEnv)
×
427
        }
×
428

429
        if err != nil {
×
430
                var helpLinkMsg string
×
431
                if d.appendMountErrorHelpLink {
×
432
                        helpLinkMsg = "\nPlease refer to http://aka.ms/blobmounterror for possible causes and solutions for mount errors."
×
433
                }
×
434
                err = status.Errorf(codes.Internal, "Mount failed with error: %v, output: %v%s", err, output, helpLinkMsg)
×
435
                klog.Errorf("%v", err)
×
436
                notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
437
                if mntErr != nil {
×
438
                        klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
439
                        return nil, err
×
440
                }
×
441
                if !notMnt {
×
442
                        if mntErr = d.mounter.Unmount(targetPath); mntErr != nil {
×
443
                                klog.Errorf("Failed to unmount: %v", mntErr)
×
444
                                return nil, err
×
445
                        }
×
446
                        notMnt, mntErr := d.mounter.IsLikelyNotMountPoint(targetPath)
×
447
                        if mntErr != nil {
×
448
                                klog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
×
449
                                return nil, err
×
450
                        }
×
451
                        if !notMnt {
×
452
                                // This is very odd, we don't expect it.  We'll try again next sync loop.
×
453
                                klog.Errorf("%s is still mounted, despite call to unmount().  Will try again next sync loop.", targetPath)
×
454
                                return nil, err
×
455
                        }
×
456
                }
457
                os.Remove(targetPath)
×
458
                return nil, err
×
459
        }
460

461
        // wait a few seconds to make sure blobfuse mount is successful
462
        // please refer to https://github.com/Azure/azure-storage-fuse/pull/1088 for more details
463
        if err := waitForMount(targetPath, waitForMountInterval, waitForMountTimeout); err != nil {
×
464
                return nil, fmt.Errorf("failed to wait for mount: %w", err)
×
465
        }
×
466

467
        klog.V(2).Infof("volume(%s) mount on %q succeeded", volumeID, targetPath)
×
468
        return &csi.NodeStageVolumeResponse{}, nil
×
469
}
470

471
// NodeUnstageVolume unmount the volume from the staging path
472
func (d *Driver) NodeUnstageVolume(_ context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
3✔
473
        volumeID := req.GetVolumeId()
3✔
474
        if len(volumeID) == 0 {
4✔
475
                return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
1✔
476
        }
1✔
477

478
        stagingTargetPath := req.GetStagingTargetPath()
2✔
479
        if len(stagingTargetPath) == 0 {
3✔
480
                return nil, status.Error(codes.InvalidArgument, "Staging target not provided")
1✔
481
        }
1✔
482

483
        lockKey := fmt.Sprintf("%s-%s", volumeID, stagingTargetPath)
1✔
484
        if acquired := d.volumeLocks.TryAcquire(lockKey); !acquired {
1✔
485
                return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
×
486
        }
×
487
        defer d.volumeLocks.Release(lockKey)
1✔
488

1✔
489
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_unstage_volume", d.cloud.ResourceGroup, "", d.Name)
1✔
490
        isOperationSucceeded := false
1✔
491
        defer func() {
2✔
492
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, volumeID)
1✔
493
        }()
1✔
494

495
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmounting on %s", volumeID, stagingTargetPath)
1✔
496
        err := mount.CleanupMountPoint(stagingTargetPath, d.mounter, true /*extensiveMountPointCheck*/)
1✔
497
        if err != nil {
1✔
498
                return nil, status.Errorf(codes.Internal, "failed to unmount staging target %q: %v", stagingTargetPath, err)
×
499
        }
×
500
        klog.V(2).Infof("NodeUnstageVolume: volume %s unmount on %s successfully", volumeID, stagingTargetPath)
1✔
501

1✔
502
        isOperationSucceeded = true
1✔
503
        return &csi.NodeUnstageVolumeResponse{}, nil
1✔
504
}
505

506
// NodeGetCapabilities return the capabilities of the Node plugin
507
func (d *Driver) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
1✔
508
        return &csi.NodeGetCapabilitiesResponse{
1✔
509
                Capabilities: d.NSCap,
1✔
510
        }, nil
1✔
511
}
1✔
512

513
// NodeGetInfo return info of the node on which this plugin is running
514
func (d *Driver) NodeGetInfo(_ context.Context, _ *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
1✔
515
        return &csi.NodeGetInfoResponse{
1✔
516
                NodeId: d.NodeID,
1✔
517
        }, nil
1✔
518
}
1✔
519

520
// NodeExpandVolume node expand volume
521
func (d *Driver) NodeExpandVolume(_ context.Context, _ *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
1✔
522
        return nil, status.Error(codes.Unimplemented, "NodeExpandVolume is not yet implemented")
1✔
523
}
1✔
524

525
// NodeGetVolumeStats get volume stats
526
func (d *Driver) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
4✔
527
        if len(req.VolumeId) == 0 {
5✔
528
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume ID was empty")
1✔
529
        }
1✔
530
        if len(req.VolumePath) == 0 {
4✔
531
                return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
1✔
532
        }
1✔
533

534
        // check if the volume stats is cached
535
        cache, err := d.volStatsCache.Get(req.VolumeId, azcache.CacheReadTypeDefault)
2✔
536
        if err != nil {
2✔
537
                return nil, status.Errorf(codes.Internal, err.Error())
×
538
        }
×
539
        if cache != nil {
2✔
540
                resp := cache.(csi.NodeGetVolumeStatsResponse)
×
541
                klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is cached", req.VolumeId, req.VolumePath)
×
542
                return &resp, nil
×
543
        }
×
544

545
        mc := metrics.NewMetricContext(blobCSIDriverName, "node_get_volume_stats", d.cloud.ResourceGroup, "", d.Name)
2✔
546
        isOperationSucceeded := false
2✔
547
        defer func() {
4✔
548
                mc.ObserveOperationWithResult(isOperationSucceeded, VolumeID, req.VolumeId)
2✔
549
        }()
2✔
550

551
        if _, err := os.Lstat(req.VolumePath); err != nil {
3✔
552
                if os.IsNotExist(err) {
2✔
553
                        return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
1✔
554
                }
1✔
555
                return nil, status.Errorf(codes.Internal, "failed to stat file %s: %v", req.VolumePath, err)
×
556
        }
557

558
        klog.V(6).Infof("NodeGetVolumeStats: begin to get VolumeStats on volume %s path %s", req.VolumeId, req.VolumePath)
1✔
559

1✔
560
        volumeMetrics, err := volume.NewMetricsStatFS(req.VolumePath).GetMetrics()
1✔
561
        if err != nil {
1✔
562
                return nil, status.Errorf(codes.Internal, "failed to get metrics: %v", err)
×
563
        }
×
564

565
        available, ok := volumeMetrics.Available.AsInt64()
1✔
566
        if !ok {
1✔
567
                return nil, status.Errorf(codes.Internal, "failed to transform volume available size(%v)", volumeMetrics.Available)
×
568
        }
×
569
        capacity, ok := volumeMetrics.Capacity.AsInt64()
1✔
570
        if !ok {
1✔
571
                return nil, status.Errorf(codes.Internal, "failed to transform volume capacity size(%v)", volumeMetrics.Capacity)
×
572
        }
×
573
        used, ok := volumeMetrics.Used.AsInt64()
1✔
574
        if !ok {
1✔
575
                return nil, status.Errorf(codes.Internal, "failed to transform volume used size(%v)", volumeMetrics.Used)
×
576
        }
×
577

578
        inodesFree, ok := volumeMetrics.InodesFree.AsInt64()
1✔
579
        if !ok {
1✔
580
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes free(%v)", volumeMetrics.InodesFree)
×
581
        }
×
582
        inodes, ok := volumeMetrics.Inodes.AsInt64()
1✔
583
        if !ok {
1✔
584
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes(%v)", volumeMetrics.Inodes)
×
585
        }
×
586
        inodesUsed, ok := volumeMetrics.InodesUsed.AsInt64()
1✔
587
        if !ok {
1✔
588
                return nil, status.Errorf(codes.Internal, "failed to transform disk inodes used(%v)", volumeMetrics.InodesUsed)
×
589
        }
×
590

591
        resp := &csi.NodeGetVolumeStatsResponse{
1✔
592
                Usage: []*csi.VolumeUsage{
1✔
593
                        {
1✔
594
                                Unit:      csi.VolumeUsage_BYTES,
1✔
595
                                Available: available,
1✔
596
                                Total:     capacity,
1✔
597
                                Used:      used,
1✔
598
                        },
1✔
599
                        {
1✔
600
                                Unit:      csi.VolumeUsage_INODES,
1✔
601
                                Available: inodesFree,
1✔
602
                                Total:     inodes,
1✔
603
                                Used:      inodesUsed,
1✔
604
                        },
1✔
605
                },
1✔
606
        }
1✔
607

1✔
608
        isOperationSucceeded = true
1✔
609
        klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is %v", req.VolumeId, req.VolumePath, resp)
1✔
610
        // cache the volume stats per volume
1✔
611
        d.volStatsCache.Set(req.VolumeId, *resp)
1✔
612
        return resp, nil
1✔
613
}
614

615
// ensureMountPoint: create mount point if not exists
616
// return <true, nil> if it's already a mounted point otherwise return <false, nil>
617
func (d *Driver) ensureMountPoint(target string, perm os.FileMode) (bool, error) {
11✔
618
        notMnt, err := d.mounter.IsLikelyNotMountPoint(target)
11✔
619
        if err != nil && !os.IsNotExist(err) {
13✔
620
                if IsCorruptedDir(target) {
2✔
621
                        notMnt = false
×
622
                        klog.Warningf("detected corrupted mount for targetPath [%s]", target)
×
623
                } else {
2✔
624
                        return !notMnt, err
2✔
625
                }
2✔
626
        }
627

628
        // Check all the mountpoints in case IsLikelyNotMountPoint
629
        // cannot handle --bind mount
630
        mountList, err := d.mounter.List()
9✔
631
        if err != nil {
9✔
632
                return !notMnt, err
×
633
        }
×
634

635
        targetAbs, err := filepath.Abs(target)
9✔
636
        if err != nil {
9✔
637
                return !notMnt, err
×
638
        }
×
639

640
        for _, mountPoint := range mountList {
9✔
641
                if mountPoint.Path == targetAbs {
×
642
                        notMnt = false
×
643
                        break
×
644
                }
645
        }
646

647
        if !notMnt {
11✔
648
                // testing original mount point, make sure the mount link is valid
2✔
649
                _, err := os.ReadDir(target)
2✔
650
                if err == nil {
3✔
651
                        klog.V(2).Infof("already mounted to target %s", target)
1✔
652
                        return !notMnt, nil
1✔
653
                }
1✔
654
                // mount link is invalid, now unmount and remount later
655
                klog.Warningf("ReadDir %s failed with %v, unmount this directory", target, err)
1✔
656
                if err := d.mounter.Unmount(target); err != nil {
1✔
657
                        klog.Errorf("Unmount directory %s failed with %v", target, err)
×
658
                        return !notMnt, err
×
659
                }
×
660
                notMnt = true
1✔
661
                return !notMnt, err
1✔
662
        }
663
        if err := volumehelper.MakeDir(target, perm); err != nil {
9✔
664
                klog.Errorf("MakeDir failed on target: %s (%v)", target, err)
2✔
665
                return !notMnt, err
2✔
666
        }
2✔
667
        return !notMnt, nil
5✔
668
}
669

670
func waitForMount(path string, intervel, timeout time.Duration) error {
2✔
671
        timeAfter := time.After(timeout)
2✔
672
        timeTick := time.Tick(intervel)
2✔
673

2✔
674
        for {
8✔
675
                select {
6✔
676
                case <-timeTick:
5✔
677
                        notMount, err := mount.New("").IsLikelyNotMountPoint(path)
5✔
678
                        if err != nil {
6✔
679
                                return err
1✔
680
                        }
1✔
681
                        if !notMount {
4✔
682
                                klog.V(2).Infof("blobfuse mount at %s success", path)
×
683
                                return nil
×
684
                        }
×
685
                case <-timeAfter:
1✔
686
                        return fmt.Errorf("timeout waiting for mount %s", path)
1✔
687
                }
688
        }
689
}
690

691
func checkGidPresentInMountFlags(mountFlags []string) bool {
4✔
692
        for _, mountFlag := range mountFlags {
6✔
693
                if strings.Contains(mountFlag, "gid=") {
4✔
694
                        return true
2✔
695
                }
2✔
696
        }
697
        return false
2✔
698
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc