• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fogfish / stream / 11748698612

08 Nov 2024 07:56PM UTC coverage: 92.564% (-1.9%) from 94.437%
11748698612

push

github

web-flow
Update optional params of the library (#38)

32 of 45 new or added lines in 2 files covered. (71.11%)

5 existing lines in 2 files now uncovered.

722 of 780 relevant lines covered (92.56%)

1.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.24
/filesystem.go
1
//
2
// Copyright (C) 2024 Dmitry Kolesnikov
3
//
4
// This file may be modified and distributed under the terms
5
// of the MIT license.  See the LICENSE file for details.
6
// https://github.com/fogfish/stream
7
//
8

9
// Package stream provides a Golang file system abstraction tailored for AWS S3,
10
// enabling seamless streaming of binary objects along with their
11
// corresponding metadata. The package implements
12
// [Golang File System](https://pkg.go.dev/io/fs) and enhances it by adding
13
// support for writable files and type-safe metadata.
14
package stream
15

16
import (
17
        "context"
18
        "errors"
19
        "fmt"
20
        "io/fs"
21
        "regexp"
22
        "strings"
23
        "time"
24

25
        "github.com/aws/aws-sdk-go-v2/aws"
26
        "github.com/aws/aws-sdk-go-v2/service/s3"
27
        "github.com/fogfish/opts"
28
)
29

30
// File System
31
type FileSystem[T any] struct {
32
        Opts
33
        bucket string
34
        codec  *codec[T]
35
}
36

37
var (
38
        _ fs.FS              = (*FileSystem[struct{}])(nil)
39
        _ fs.StatFS          = (*FileSystem[struct{}])(nil)
40
        _ fs.ReadDirFS       = (*FileSystem[struct{}])(nil)
41
        _ fs.GlobFS          = (*FileSystem[struct{}])(nil)
42
        _ CreateFS[struct{}] = (*FileSystem[struct{}])(nil)
43
        _ RemoveFS           = (*FileSystem[struct{}])(nil)
44
        _ CopyFS             = (*FileSystem[struct{}])(nil)
45
)
46

47
// Create a file system instance, mounting S3 Bucket. Use Option type to
48
// configure file system.
49
func New[T any](bucket string, opt ...Option) (*FileSystem[T], error) {
1✔
50
        if len(bucket) == 0 {
1✔
NEW
51
                return nil, fmt.Errorf("bucket is not defined")
×
NEW
52
        }
×
53

54
        fsys := FileSystem[T]{
1✔
55
                Opts:   optsDefault(),
1✔
56
                bucket: bucket,
1✔
57
                codec:  newCodec[T](),
1✔
58
        }
1✔
59

1✔
60
        if err := opts.Apply(&fsys.Opts, opt); err != nil {
1✔
NEW
61
                return nil, err
×
UNCOV
62
        }
×
63

64
        if fsys.api == nil {
2✔
65
                if err := optsDefaultS3(&fsys.Opts); err != nil {
1✔
UNCOV
66
                        return nil, err
×
67
                }
×
68
        }
69

70
        return &fsys, fsys.checkRequired()
1✔
71
}
72

73
// Create a file system instance, mounting S3 Bucket. Use Option type to
74
// configure file system.
75
func NewFS(bucket string, opts ...Option) (*FileSystem[struct{}], error) {
1✔
76
        return New[struct{}](bucket, opts...)
1✔
77
}
1✔
78

79
// To open the file for writing use `Create` function giving the absolute path
80
// starting with `/`, the returned file descriptor is a composite of
81
// `io.Writer`, `io.Closer` and `stream.Stat`. Utilize Golang's convenient
82
// streaming methods to update S3 object seamlessly. Once all bytes are written,
83
// it's crucial to close the stream. Failure to do so would cause data loss.
84
// The object is considered successfully created on S3 only if all `Write`
85
// operations and subsequent `Close` actions are successful.
86
func (fsys *FileSystem[T]) Create(path string, attr *T) (File, error) {
1✔
87
        if err := RequireValidFile("create", path); err != nil {
2✔
88
                return nil, err
1✔
89
        }
1✔
90

91
        return newWriter(fsys, path, attr), nil
1✔
92
}
93

94
// To open the file for reading use `Open` function giving the absolute path
95
// starting with `/`, the returned file descriptor is a composite of
96
// `io.Reader`, `io.Closer` and `stream.Stat`. Utilize Golang's convenient
97
// streaming methods to consume S3 object seamlessly.
98
func (fsys *FileSystem[T]) Open(path string) (fs.File, error) {
1✔
99
        if err := RequireValidPath("open", path); err != nil {
2✔
100
                return nil, err
1✔
101
        }
1✔
102

103
        if IsValidDir(path) {
2✔
104
                return openDir(fsys, path), nil
1✔
105
        }
1✔
106

107
        return newReader(fsys, path), nil
1✔
108
}
109

110
// Stat returns a FileInfo describing the file.
111
// File system executes HeadObject S3 API call to obtain metadata.
112
func (fsys *FileSystem[T]) Stat(path string) (fs.FileInfo, error) {
1✔
113
        if err := RequireValidPath("stat", path); err != nil {
2✔
114
                return nil, err
1✔
115
        }
1✔
116

117
        info := info[T]{path: path}
1✔
118

1✔
119
        if IsValidDir(path) {
2✔
120
                info.mode = fs.ModeDir
1✔
121
                return info, nil
1✔
122
        }
1✔
123

124
        ctx, cancel := context.WithTimeout(context.Background(), fsys.timeout)
1✔
125
        defer cancel()
1✔
126

1✔
127
        req := &s3.HeadObjectInput{
1✔
128
                Bucket: aws.String(fsys.bucket),
1✔
129
                Key:    info.s3Key(),
1✔
130
        }
1✔
131

1✔
132
        val, err := fsys.api.HeadObject(ctx, req)
1✔
133
        if err != nil {
2✔
134
                switch {
1✔
135
                case recoverNotFound(err):
1✔
136
                        return nil, fs.ErrNotExist
1✔
137
                default:
1✔
138
                        return nil, &fs.PathError{
1✔
139
                                Op:   "stat",
1✔
140
                                Path: path,
1✔
141
                                Err:  err,
1✔
142
                        }
1✔
143
                }
144
        }
145

146
        info.size = aws.ToInt64(val.ContentLength)
1✔
147
        info.time = aws.ToTime(val.LastModified)
1✔
148
        info.attr = new(T)
1✔
149
        fsys.codec.DecodeHeadOutput(val, info.attr)
1✔
150

1✔
151
        if fsys.signer != nil && fsys.codec.s != nil {
2✔
152
                if url, err := fsys.preSignGetUrl(info.s3Key()); err == nil {
2✔
153
                        fsys.codec.s.Put(info.attr, url)
1✔
154
                }
1✔
155
        }
156

157
        return info, nil
1✔
158
}
159

160
// Returns file metadata of type T embedded into a FileInfo.
161
func (fsys *FileSystem[T]) StatSys(stat fs.FileInfo) *T {
1✔
162
        info, ok := stat.(info[T])
1✔
163
        if !ok {
1✔
164
                return nil
×
165
        }
×
166

167
        return info.attr
1✔
168
}
169

170
func (fsys *FileSystem[T]) preSignGetUrl(s3key *string) (string, error) {
1✔
171
        req := &s3.GetObjectInput{
1✔
172
                Bucket: aws.String(fsys.bucket),
1✔
173
                Key:    s3key,
1✔
174
        }
1✔
175

1✔
176
        ctx, cancel := context.WithTimeout(context.Background(), fsys.timeout)
1✔
177
        defer cancel()
1✔
178

1✔
179
        val, err := fsys.signer.PresignGetObject(ctx, req, s3.WithPresignExpires(fsys.ttlSignedUrl))
1✔
180
        if err != nil {
2✔
181
                return "", &fs.PathError{
1✔
182
                        Op:   "presign",
1✔
183
                        Path: "/" + aws.ToString(s3key),
1✔
184
                        Err:  err,
1✔
185
                }
1✔
186
        }
1✔
187

188
        return val.URL, nil
1✔
189
}
190

191
// Reads the named directory or path prefix.
192
// The classical file system organize data hierarchically into directories as
193
// opposed to the flat storage structure of general purpose AWS S3.
194
//
195
// It assumes a directory if the path ends with `/`.
196
//
197
// It return path relative to pattern for all found object.
198
func (fsys *FileSystem[T]) ReadDir(path string) ([]fs.DirEntry, error) {
1✔
199
        if err := RequireValidDir("readdir", path); err != nil {
2✔
200
                return nil, err
1✔
201
        }
1✔
202

203
        dd := openDir(fsys, path)
1✔
204

1✔
205
        return dd.ReadDir(-1)
1✔
206
}
207

208
// Glob returns the names of all files matching pattern.
209
// The classical file system organize data hierarchically into directories as
210
// opposed to the flat storage structure of general purpose AWS S3.
211
//
212
// It assumes a directory if the path ends with `/`.
213
//
214
// It return path relative to pattern for all found object.
215
//
216
// The pattern consists of S3 key prefix Golang regex. Its are split by `|`.
217
func (fsys *FileSystem[T]) Glob(pattern string) ([]string, error) {
1✔
218
        var reg *regexp.Regexp
1✔
219
        var err error
1✔
220

1✔
221
        pat := strings.SplitN(pattern, "|", 2)
1✔
222
        if len(pat) == 2 {
2✔
223
                reg, err = regexp.Compile(pat[1])
1✔
224
                if err != nil {
2✔
225
                        return nil, &fs.PathError{
1✔
226
                                Op:   "glob",
1✔
227
                                Path: pattern,
1✔
228
                                Err:  err,
1✔
229
                        }
1✔
230
                }
1✔
231
        }
232

233
        dir, err := fsys.ReadDir(pat[0])
1✔
234
        if err != nil {
1✔
235
                return nil, err
×
236
        }
×
237

238
        seq := make([]string, 0)
1✔
239
        for _, x := range dir {
2✔
240
                if reg == nil || reg.MatchString(x.Name()) {
2✔
241
                        seq = append(seq, x.Name())
1✔
242
                }
1✔
243
        }
244
        return seq, nil
1✔
245
}
246

247
// Remove object
248
func (fsys *FileSystem[T]) Remove(path string) error {
1✔
249
        if err := RequireValidFile("remove", path); err != nil {
2✔
250
                return err
1✔
251
        }
1✔
252

253
        ctx, cancel := context.WithTimeout(context.Background(), fsys.timeout)
1✔
254
        defer cancel()
1✔
255

1✔
256
        req := &s3.DeleteObjectInput{
1✔
257
                Bucket: &fsys.bucket,
1✔
258
                Key:    s3Key(path),
1✔
259
        }
1✔
260

1✔
261
        _, err := fsys.api.DeleteObject(ctx, req)
1✔
262
        if err != nil {
2✔
263
                return &fs.PathError{
1✔
264
                        Op:   "remove",
1✔
265
                        Path: path,
1✔
266
                        Err:  err,
1✔
267
                }
1✔
268
        }
1✔
269

270
        return nil
1✔
271
}
272

273
// Copy object from source location to the target.
274
// The target shall be absolute s3://bucket/key url.
275
func (fsys *FileSystem[T]) Copy(source, target string) error {
1✔
276
        if err := RequireValidPath("copy", source); err != nil {
2✔
277
                return err
1✔
278
        }
1✔
279

280
        if !strings.HasPrefix(target, "s3://") {
2✔
281
                return &fs.PathError{
1✔
282
                        Op:   "copy",
1✔
283
                        Path: target,
1✔
284
                        Err:  errors.New("s3:// prefix is required"),
1✔
285
                }
1✔
286
        }
1✔
287

288
        ctx, cancel := context.WithTimeout(context.Background(), fsys.timeout)
1✔
289
        defer cancel()
1✔
290

1✔
291
        req := &s3.CopyObjectInput{
1✔
292
                Bucket:     &fsys.bucket,
1✔
293
                Key:        s3Key(source),
1✔
294
                CopySource: aws.String(target[5:]),
1✔
295
        }
1✔
296

1✔
297
        _, err := fsys.api.CopyObject(ctx, req)
1✔
298
        if err != nil {
2✔
299
                return &fs.PathError{
1✔
300
                        Op:   "copy",
1✔
301
                        Path: target,
1✔
302
                        Err:  err,
1✔
303
                }
1✔
304
        }
1✔
305

306
        return nil
1✔
307
}
308

309
// Wait for timeout until path exists
310
func (fsys *FileSystem[T]) Wait(path string, timeout time.Duration) error {
1✔
311
        if err := RequireValidFile("wait", path); err != nil {
2✔
312
                return err
1✔
313
        }
1✔
314

315
        waiter := s3.NewObjectExistsWaiter(fsys.api)
1✔
316

1✔
317
        req := &s3.HeadObjectInput{
1✔
318
                Bucket: aws.String(fsys.bucket),
1✔
319
                Key:    s3Key(path),
1✔
320
        }
1✔
321

1✔
322
        err := waiter.Wait(context.Background(), req, timeout)
1✔
323
        if err != nil {
2✔
324
                return &fs.PathError{
1✔
325
                        Op:   "wait",
1✔
326
                        Path: path,
1✔
327
                        Err:  err,
1✔
328
                }
1✔
329
        }
1✔
330

331
        return nil
1✔
332
}
333

334
//------------------------------------------------------------------------------
335

336
func recoverNoSuchKey(err error) bool {
1✔
337
        var e interface{ ErrorCode() string }
1✔
338

1✔
339
        ok := errors.As(err, &e)
1✔
340
        return ok && e.ErrorCode() == "NoSuchKey"
1✔
341
}
1✔
342

343
func recoverNotFound(err error) bool {
1✔
344
        var e interface{ ErrorCode() string }
1✔
345

1✔
346
        ok := errors.As(err, &e)
1✔
347
        return ok && e.ErrorCode() == "NotFound"
1✔
348
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc