• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 25779881759

13 May 2026 05:11AM UTC coverage: 84.96% (-0.02%) from 84.975%
25779881759

Pull #2096

gh-ci

darcychristUWU
golang version needs to match go minimum version
Pull Request #2096: golang version needs to match go minimum version

45208 of 53211 relevant lines covered (84.96%)

126089.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.74
/embedded/appendable/multiapp/multi_app.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package multiapp
18

19
import (
20
        "context"
21
        "errors"
22
        "fmt"
23
        "io"
24
        "os"
25
        "path"
26
        "path/filepath"
27
        "strconv"
28
        "strings"
29
        "sync"
30

31
        "github.com/codenotary/immudb/embedded/appendable"
32
        "github.com/codenotary/immudb/embedded/appendable/fileutils"
33
        "github.com/codenotary/immudb/embedded/appendable/singleapp"
34
        "github.com/codenotary/immudb/embedded/cache"
35

36
        "golang.org/x/sync/singleflight"
37
)
38

39
var ErrorPathIsNotADirectory = errors.New("multiapp: path is not a directory")
40
var ErrIllegalArguments = errors.New("multiapp: illegal arguments")
41
var ErrInvalidOptions = fmt.Errorf("%w: invalid options", ErrIllegalArguments)
42
var ErrAlreadyClosed = errors.New("multiapp: already closed")
43
var ErrReadOnly = errors.New("multiapp: read-only mode")
44

45
const (
46
        metaFileSize    = "FILE_SIZE"
47
        metaWrappedMeta = "WRAPPED_METADATA"
48
)
49

50
//---------------------------------------------------------
51

52
var _ appendable.Appendable = (*MultiFileAppendable)(nil)
53

54
type MultiFileAppendableHooks interface {
55
        // Hook to open underlying appendable.
56
        // If needsWriteAccess is set to true, this appendable must be a single file appendable
57
        OpenAppendable(options *singleapp.Options, appname string, needsWriteAccess bool) (appendable.Appendable, error)
58

59
        // Hook to open the last underlying appendable that's available
60
        OpenInitialAppendable(opts *Options, singleAppOpts *singleapp.Options) (app appendable.Appendable, appID int64, err error)
61
}
62

63
type DefaultMultiFileAppendableHooks struct {
64
        path string
65
}
66

67
func (d *DefaultMultiFileAppendableHooks) OpenInitialAppendable(opts *Options, singleAppOpts *singleapp.Options) (app appendable.Appendable, appID int64, err error) {
20,379✔
68
        entries, err := os.ReadDir(d.path)
20,379✔
69
        if err != nil {
20,379✔
70
                return nil, 0, err
×
71
        }
×
72

73
        var filename string
20,379✔
74

20,379✔
75
        if len(entries) > 0 {
21,285✔
76
                filename = entries[len(entries)-1].Name()
906✔
77

906✔
78
                appID, err = strconv.ParseInt(strings.TrimSuffix(filename, filepath.Ext(filename)), 10, 64)
906✔
79
                if err != nil {
907✔
80
                        return nil, 0, err
1✔
81
                }
1✔
82
        } else {
19,473✔
83
                appID = 0
19,473✔
84
                filename = appendableName(appendableID(0, opts.fileSize), opts.fileExt)
19,473✔
85
        }
19,473✔
86

87
        app, err = d.OpenAppendable(singleAppOpts, filename, true)
20,378✔
88
        if err != nil {
20,379✔
89
                return nil, 0, err
1✔
90
        }
1✔
91

92
        return app, appID, nil
20,377✔
93
}
94

95
func (d *DefaultMultiFileAppendableHooks) OpenAppendable(options *singleapp.Options, appname string, needsWriteAccess bool) (appendable.Appendable, error) {
26,129✔
96
        return singleapp.Open(filepath.Join(d.path, appname), options)
26,129✔
97
}
26,129✔
98

99
type MultiFileAppendable struct {
100
        appendables appendableCache
101

102
        currAppID int64
103
        currApp   appendable.Appendable
104

105
        path           string
106
        readOnly       bool
107
        retryableSync  bool
108
        autoSync       bool
109
        fileMode       os.FileMode
110
        fileSize       int
111
        fileExt        string
112
        readBufferSize int
113
        prealloc       bool
114

115
        writeBuffer []byte // shared write-buffer only used by active appendable
116

117
        closed bool
118

119
        hooks MultiFileAppendableHooks
120

121
        mutex sync.Mutex
122

123
        // Sequential read-ahead state. Background goroutines open the
124
        // next prefetchAheadDepth chunks whenever ReadAt advances into a
125
        // new chunk in monotonically increasing order. singleflight keys
126
        // each open by appID so concurrent foreground+prefetch attempts
127
        // for the same chunk do at most one OpenAppendable. Cancelled on
128
        // Close via prefetchCancel.
129
        prefetchAheadDepth int
130
        prefetchCtx        context.Context
131
        prefetchCancel     context.CancelFunc
132
        prefetchSf         singleflight.Group
133
        prefetchPrevAppID  int64 // -1 sentinel = no previous read
134
}
135

136
func Open(path string, opts *Options) (*MultiFileAppendable, error) {
20,385✔
137
        return OpenWithHooks(path, &DefaultMultiFileAppendableHooks{
20,385✔
138
                path: path,
20,385✔
139
        }, opts)
20,385✔
140
}
20,385✔
141

142
func OpenWithHooks(path string, hooks MultiFileAppendableHooks, opts *Options) (*MultiFileAppendable, error) {
20,460✔
143
        err := opts.Validate()
20,460✔
144
        if err != nil {
20,461✔
145
                return nil, err
1✔
146
        }
1✔
147

148
        finfo, err := os.Stat(path)
20,459✔
149
        if err != nil {
39,967✔
150
                if !os.IsNotExist(err) || opts.readOnly {
19,508✔
151
                        return nil, err
×
152
                }
×
153

154
                err = os.Mkdir(path, opts.fileMode)
19,508✔
155
                if err != nil {
19,508✔
156
                        return nil, err
×
157
                }
×
158

159
                err = fileutils.SyncDir(path, filepath.Dir(path))
19,508✔
160
                if err != nil {
19,508✔
161
                        return nil, err
×
162
                }
×
163
        } else if !finfo.IsDir() {
957✔
164
                return nil, ErrorPathIsNotADirectory
6✔
165
        }
6✔
166

167
        m := appendable.NewMetadata(nil)
20,453✔
168
        m.PutInt(metaFileSize, opts.fileSize)
20,453✔
169
        m.Put(metaWrappedMeta, opts.metadata)
20,453✔
170

20,453✔
171
        var writeBuffer []byte
20,453✔
172

20,453✔
173
        if !opts.readOnly {
40,899✔
174
                // write buffer is only needed when appendable is not opened in read-only mode
20,446✔
175
                writeBuffer = make([]byte, opts.GetWriteBufferSize())
20,446✔
176
        }
20,446✔
177

178
        appendableOpts := singleapp.DefaultOptions().
20,453✔
179
                WithReadOnly(opts.readOnly).
20,453✔
180
                WithRetryableSync(opts.retryableSync).
20,453✔
181
                WithAutoSync(opts.autoSync).
20,453✔
182
                WithFileMode(opts.fileMode).
20,453✔
183
                WithCompressionFormat(opts.compressionFormat).
20,453✔
184
                WithCompresionLevel(opts.compressionLevel).
20,453✔
185
                WithReadBufferSize(opts.readBufferSize).
20,453✔
186
                WithWriteBuffer(writeBuffer).
20,453✔
187
                WithMetadata(m.Bytes())
20,453✔
188

20,453✔
189
        if opts.prealloc {
20,457✔
190
                appendableOpts.WithPreallocSize(opts.fileSize)
4✔
191
        }
4✔
192

193
        currApp, currAppID, err := hooks.OpenInitialAppendable(opts, appendableOpts)
20,453✔
194
        if err != nil {
20,457✔
195
                return nil, err
4✔
196
        }
4✔
197

198
        cache, err := cache.NewCache(opts.maxOpenedFiles)
20,449✔
199
        if err != nil {
20,449✔
200
                return nil, err
×
201
        }
×
202

203
        fileSize, _ := appendable.NewMetadata(currApp.Metadata()).GetInt(metaFileSize)
20,449✔
204

20,449✔
205
        pCtx, pCancel := context.WithCancel(context.Background())
20,449✔
206
        return &MultiFileAppendable{
20,449✔
207
                appendables:        appendableCache{cache: cache},
20,449✔
208
                currAppID:          currAppID,
20,449✔
209
                currApp:            currApp,
20,449✔
210
                path:               path,
20,449✔
211
                readOnly:           opts.readOnly,
20,449✔
212
                retryableSync:      opts.retryableSync,
20,449✔
213
                autoSync:           opts.autoSync,
20,449✔
214
                fileMode:           opts.fileMode,
20,449✔
215
                fileSize:           fileSize,
20,449✔
216
                fileExt:            opts.fileExt,
20,449✔
217
                readBufferSize:     opts.readBufferSize,
20,449✔
218
                prealloc:           opts.prealloc,
20,449✔
219
                writeBuffer:        writeBuffer,
20,449✔
220
                closed:             false,
20,449✔
221
                hooks:              hooks,
20,449✔
222
                prefetchAheadDepth: opts.prefetchAheadDepth,
20,449✔
223
                prefetchCtx:        pCtx,
20,449✔
224
                prefetchCancel:     pCancel,
20,449✔
225
                prefetchPrevAppID:  -1,
20,449✔
226
        }, nil
20,449✔
227
}
228

229
func appendableName(appID int64, ext string) string {
25,534✔
230
        return fmt.Sprintf("%08d.%s", appID, ext)
25,534✔
231
}
25,534✔
232

233
func appendableID(off int64, fileSize int) int64 {
1,204,207✔
234
        return off / int64(fileSize)
1,204,207✔
235
}
1,204,207✔
236

237
func (mf *MultiFileAppendable) Copy(dstPath string) error {
3✔
238
        mf.mutex.Lock()
3✔
239
        defer mf.mutex.Unlock()
3✔
240

3✔
241
        if mf.closed {
4✔
242
                return ErrAlreadyClosed
1✔
243
        }
1✔
244

245
        if !mf.readOnly {
4✔
246
                err := mf.sync()
2✔
247
                if err != nil {
2✔
248
                        return err
×
249
                }
×
250
        }
251

252
        err := os.MkdirAll(dstPath, mf.fileMode)
2✔
253
        if err != nil {
3✔
254
                return err
1✔
255
        }
1✔
256

257
        entries, err := os.ReadDir(mf.path)
1✔
258
        if err != nil {
1✔
259
                return err
×
260
        }
×
261

262
        for _, e := range entries {
4✔
263
                _, err = copyFile(path.Join(mf.path, e.Name()), path.Join(dstPath, e.Name()))
3✔
264
                if err != nil {
3✔
265
                        return err
×
266
                }
×
267
        }
268

269
        return nil
1✔
270
}
271

272
func copyFile(srcPath, dstPath string) (int64, error) {
3✔
273
        dstFile, err := os.Create(dstPath)
3✔
274
        if err != nil {
3✔
275
                return 0, err
×
276
        }
×
277
        defer dstFile.Close()
3✔
278

3✔
279
        srcFile, err := os.Open(srcPath)
3✔
280
        if err != nil {
3✔
281
                return 0, err
×
282
        }
×
283
        defer srcFile.Close()
3✔
284

3✔
285
        return io.Copy(dstFile, srcFile)
3✔
286
}
287

288
func (mf *MultiFileAppendable) CompressionFormat() int {
1✔
289
        mf.mutex.Lock()
1✔
290
        defer mf.mutex.Unlock()
1✔
291

1✔
292
        return mf.currApp.CompressionFormat()
1✔
293
}
1✔
294

295
func (mf *MultiFileAppendable) CompressionLevel() int {
1✔
296
        mf.mutex.Lock()
1✔
297
        defer mf.mutex.Unlock()
1✔
298

1✔
299
        return mf.currApp.CompressionLevel()
1✔
300
}
1✔
301

302
func (mf *MultiFileAppendable) Metadata() []byte {
6,768✔
303
        mf.mutex.Lock()
6,768✔
304
        defer mf.mutex.Unlock()
6,768✔
305

6,768✔
306
        bs, _ := appendable.NewMetadata(mf.currApp.Metadata()).Get(metaWrappedMeta)
6,768✔
307
        return bs
6,768✔
308
}
6,768✔
309

310
func (mf *MultiFileAppendable) Size() (int64, error) {
7,139✔
311
        mf.mutex.Lock()
7,139✔
312
        defer mf.mutex.Unlock()
7,139✔
313

7,139✔
314
        if mf.closed {
7,140✔
315
                return 0, ErrAlreadyClosed
1✔
316
        }
1✔
317
        currSize, err := mf.currApp.Size()
7,138✔
318
        if err != nil {
7,138✔
319
                return 0, err
×
320
        }
×
321

322
        return mf.currAppID*int64(mf.fileSize) + currSize, nil
7,138✔
323
}
324

325
func (mf *MultiFileAppendable) Append(bs []byte) (off int64, n int, err error) {
1,328,170✔
326
        mf.mutex.Lock()
1,328,170✔
327
        defer mf.mutex.Unlock()
1,328,170✔
328

1,328,170✔
329
        if mf.closed {
1,328,171✔
330
                return 0, 0, ErrAlreadyClosed
1✔
331
        }
1✔
332

333
        if mf.readOnly {
1,328,171✔
334
                return 0, 0, ErrReadOnly
2✔
335
        }
2✔
336

337
        if len(bs) == 0 {
1,328,169✔
338
                return 0, 0, ErrIllegalArguments
2✔
339
        }
2✔
340

341
        for n < len(bs) {
2,660,709✔
342
                available := mf.fileSize - int(mf.currApp.Offset())
1,332,544✔
343

1,332,544✔
344
                if available <= 0 {
1,337,188✔
345
                        // by switching to read-only mode, the write buffer is freed
4,644✔
346
                        err = mf.currApp.SwitchToReadOnlyMode()
4,644✔
347
                        if err != nil {
4,644✔
348
                                return off, n, err
×
349
                        }
×
350

351
                        _, ejectedApp, err := mf.appendables.Put(mf.currAppID, mf.currApp)
4,644✔
352
                        if err != nil {
4,644✔
353
                                return off, n, err
×
354
                        }
×
355

356
                        if ejectedApp != nil {
8,550✔
357
                                metricsCacheEvicted.Inc()
3,906✔
358
                                err = ejectedApp.Close()
3,906✔
359
                                if err != nil {
3,907✔
360
                                        return off, n, err
1✔
361
                                }
1✔
362

363
                        }
364

365
                        mf.currAppID++
4,643✔
366
                        currApp, err := mf.openAppendable(appendableName(mf.currAppID, mf.fileExt), true, true)
4,643✔
367
                        if err != nil {
4,643✔
368
                                return off, n, err
×
369
                        }
×
370

371
                        mf.currApp = currApp
4,643✔
372

4,643✔
373
                        err = currApp.SetOffset(0)
4,643✔
374
                        if err != nil {
4,643✔
375
                                return off, n, err
×
376
                        }
×
377

378
                        available = mf.fileSize
4,643✔
379
                }
380

381
                var d int
1,332,543✔
382

1,332,543✔
383
                if mf.currApp.CompressionFormat() == appendable.NoCompression {
2,662,084✔
384
                        d = minInt(available, len(bs)-n)
1,329,541✔
385
                } else {
1,332,543✔
386
                        d = len(bs) - n
3,002✔
387
                }
3,002✔
388

389
                offn, _, err := mf.currApp.Append(bs[n : n+d])
1,332,543✔
390
                if err != nil {
1,332,543✔
391
                        return off, n, err
×
392
                }
×
393

394
                if n == 0 {
2,660,707✔
395
                        off = offn + mf.currAppID*int64(mf.fileSize)
1,328,164✔
396
                }
1,328,164✔
397

398
                n += d
1,332,543✔
399
        }
400

401
        return
1,328,164✔
402
}
403

404
func (mf *MultiFileAppendable) openAppendable(appname string, createIfNotExists, activeChunk bool) (appendable.Appendable, error) {
4,648✔
405
        appendableOpts := singleapp.DefaultOptions().
4,648✔
406
                WithReadOnly(mf.readOnly).
4,648✔
407
                WithRetryableSync(mf.retryableSync).
4,648✔
408
                WithAutoSync(mf.autoSync).
4,648✔
409
                WithFileMode(mf.fileMode).
4,648✔
410
                WithCreateIfNotExists(createIfNotExists).
4,648✔
411
                WithReadBufferSize(mf.readBufferSize).
4,648✔
412
                WithCompressionFormat(mf.currApp.CompressionFormat()).
4,648✔
413
                WithCompresionLevel(mf.currApp.CompressionLevel()).
4,648✔
414
                WithMetadata(mf.currApp.Metadata())
4,648✔
415

4,648✔
416
        if mf.prealloc {
4,648✔
417
                appendableOpts.WithPreallocSize(mf.fileSize)
×
418
        }
×
419

420
        if activeChunk && !mf.readOnly {
9,296✔
421
                appendableOpts.WithWriteBuffer(mf.writeBuffer)
4,648✔
422
        }
4,648✔
423

424
        return mf.hooks.OpenAppendable(appendableOpts, appname, activeChunk)
4,648✔
425
}
426

427
func (mf *MultiFileAppendable) Offset() int64 {
1✔
428
        mf.mutex.Lock()
1✔
429
        defer mf.mutex.Unlock()
1✔
430

1✔
431
        return mf.offset()
1✔
432
}
1✔
433

434
func (mf *MultiFileAppendable) offset() int64 {
579,562✔
435
        return mf.currAppID*int64(mf.fileSize) + mf.currApp.Offset()
579,562✔
436
}
579,562✔
437

438
func (mf *MultiFileAppendable) SetOffset(off int64) error {
578,504✔
439
        mf.mutex.Lock()
578,504✔
440
        defer mf.mutex.Unlock()
578,504✔
441

578,504✔
442
        if mf.closed {
578,505✔
443
                return ErrAlreadyClosed
1✔
444
        }
1✔
445

446
        if mf.readOnly {
578,503✔
447
                return ErrReadOnly
×
448
        }
×
449

450
        currOffset := mf.offset()
578,503✔
451

578,503✔
452
        if off > currOffset {
578,503✔
453
                return fmt.Errorf("%w: provided offset %d is bigger than current one %d", ErrIllegalArguments, off, currOffset)
×
454
        }
×
455

456
        if off == currOffset {
1,156,979✔
457
                return nil
578,476✔
458
        }
578,476✔
459

460
        appID := appendableID(off, mf.fileSize)
27✔
461

27✔
462
        // given the new offset is lower than the current one, it means
27✔
463
        // either appID ==  mf.currAppID or appID < mf.currAppID must hold
27✔
464

27✔
465
        if mf.currAppID != appID {
32✔
466

5✔
467
                // Head might have moved back, this means that all
5✔
468
                // chunks that follow are no longer valid (will be overwritten anyway).
5✔
469
                // We also must flush / close current chunk since it will be reopened.
5✔
470
                for id := appID; id < mf.currAppID; id++ {
16✔
471
                        app, err := mf.appendables.Pop(id)
11✔
472
                        if errors.Is(err, cache.ErrKeyNotFound) {
20✔
473
                                continue
9✔
474
                        }
475
                        if err != nil {
2✔
476
                                return err
×
477
                        }
×
478
                        err = app.Close()
2✔
479
                        if err != nil {
2✔
480
                                return err
×
481
                        }
×
482
                }
483

484
                // close current appendable as it's not present in the cache
485
                err := mf.currApp.Close()
5✔
486
                if err != nil {
5✔
487
                        return err
×
488
                }
×
489

490
                app, err := mf.openAppendable(appendableName(appID, mf.fileExt), false, true)
5✔
491
                if err != nil {
8✔
492
                        if os.IsNotExist(err) {
3✔
493
                                return io.EOF
×
494
                        }
×
495
                        return err
3✔
496
                }
497

498
                mf.currAppID = appID
2✔
499
                mf.currApp = app
2✔
500
        }
501

502
        return mf.currApp.SetOffset(off % int64(mf.fileSize))
24✔
503
}
504

505
func (mf *MultiFileAppendable) DiscardUpto(off int64) error {
1,059✔
506
        mf.mutex.Lock()
1,059✔
507
        defer mf.mutex.Unlock()
1,059✔
508

1,059✔
509
        if mf.closed {
1,060✔
510
                return ErrAlreadyClosed
1✔
511
        }
1✔
512

513
        if mf.offset() < off {
1,059✔
514
                return fmt.Errorf("%w: discard beyond existent data boundaries", ErrIllegalArguments)
1✔
515
        }
1✔
516

517
        appID := appendableID(off, mf.fileSize)
1,057✔
518

1,057✔
519
        var dirSyncNeeded bool
1,057✔
520

1,057✔
521
        for i := int64(0); i < appID; i++ {
1,294✔
522
                if i == mf.currAppID {
238✔
523
                        break
1✔
524
                }
525

526
                app, err := mf.appendables.Pop(i)
236✔
527
                if err == nil {
331✔
528
                        err = app.Close()
95✔
529
                        if err != nil {
95✔
530
                                return err
×
531
                        }
×
532
                }
533

534
                appFile := filepath.Join(mf.path, appendableName(i, mf.fileExt))
236✔
535
                err = os.Remove(appFile)
236✔
536
                if err != nil && !os.IsNotExist(err) {
236✔
537
                        return err
×
538
                }
×
539

540
                dirSyncNeeded = true
236✔
541
        }
542

543
        if dirSyncNeeded {
1,095✔
544
                err := fileutils.SyncDir(mf.path)
38✔
545
                if err != nil {
38✔
546
                        return err
×
547
                }
×
548
        }
549

550
        return nil
1,057✔
551
}
552

553
func (mf *MultiFileAppendable) appendableFor(off int64) (appendable.Appendable, error) {
1,183,651✔
554
        mf.mutex.Lock()
1,183,651✔
555

1,183,651✔
556
        if mf.closed {
1,183,652✔
557
                mf.mutex.Unlock()
1✔
558
                return nil, ErrAlreadyClosed
1✔
559
        }
1✔
560

561
        appID := appendableID(off, mf.fileSize)
1,183,650✔
562

1,183,650✔
563
        if appID == mf.currAppID {
2,341,233✔
564
                metricsCacheHit.Inc()
1,157,583✔
565
                mf.maybePrefetchAheadLocked(appID)
1,157,583✔
566
                mf.mutex.Unlock()
1,157,583✔
567
                return mf.currApp, nil
1,157,583✔
568
        }
1,157,583✔
569

570
        // Cache hit fast path.
571
        if app, gerr := mf.appendables.Get(appID); gerr == nil {
50,964✔
572
                metricsCacheHit.Inc()
24,897✔
573
                mf.maybePrefetchAheadLocked(appID)
24,897✔
574
                mf.mutex.Unlock()
24,897✔
575
                return app, nil
24,897✔
576
        } else if !errors.Is(gerr, cache.ErrKeyNotFound) {
26,067✔
577
                mf.mutex.Unlock()
×
578
                return nil, gerr
×
579
        }
×
580

581
        metricsCacheMiss.Inc()
1,170✔
582

1,170✔
583
        // Snapshot the writer-mutable bits the open path needs, then
1,170✔
584
        // drop the mutex so the slow openAppendable can run in parallel
1,170✔
585
        // with other readers and (more importantly) with any in-flight
1,170✔
586
        // background prefetch for the same appID. Foreground and
1,170✔
587
        // prefetch coalesce on mf.prefetchSf so a single Get covers both.
1,170✔
588
        snap := openAppendableSnapshot{
1,170✔
589
                compressionFormat: mf.currApp.CompressionFormat(),
1,170✔
590
                compressionLevel:  mf.currApp.CompressionLevel(),
1,170✔
591
                metadata:          mf.currApp.Metadata(),
1,170✔
592
        }
1,170✔
593
        mf.mutex.Unlock()
1,170✔
594

1,170✔
595
        key := strconv.FormatInt(appID, 10)
1,170✔
596
        _, err, _ := mf.prefetchSf.Do(key, func() (interface{}, error) {
2,339✔
597
                // Race check: someone may have inserted while we were
1,169✔
598
                // waiting for the singleflight slot.
1,169✔
599
                mf.mutex.Lock()
1,169✔
600
                if mf.closed {
1,169✔
601
                        mf.mutex.Unlock()
×
602
                        return nil, ErrAlreadyClosed
×
603
                }
×
604
                if v, gerr := mf.appendables.Get(appID); gerr == nil {
1,169✔
605
                        mf.mutex.Unlock()
×
606
                        if rc, ok := v.(*refCountedApp); ok {
×
607
                                _ = rc.Release()
×
608
                        }
×
609
                        return nil, nil
×
610
                }
611
                mf.mutex.Unlock()
1,169✔
612

1,169✔
613
                raw, err := mf.openAppendableFromSnapshot(snap, appendableName(appID, mf.fileExt), false, false)
1,169✔
614
                if err != nil {
1,221✔
615
                        return nil, err
52✔
616
                }
52✔
617

618
                mf.mutex.Lock()
1,117✔
619
                defer mf.mutex.Unlock()
1,117✔
620
                if mf.closed {
1,117✔
621
                        _ = raw.Close()
×
622
                        return nil, ErrAlreadyClosed
×
623
                }
×
624
                // One last cache check: another opener might have raced us
625
                // while our open was outstanding.
626
                if v, gerr := mf.appendables.Get(appID); gerr == nil {
1,117✔
627
                        if rc, ok := v.(*refCountedApp); ok {
×
628
                                _ = rc.Release()
×
629
                        }
×
630
                        _ = raw.Close()
×
631
                        return nil, nil
×
632
                }
633
                _, ejected, perr := mf.appendables.Put(appID, raw)
1,117✔
634
                if perr != nil {
1,117✔
635
                        _ = raw.Close()
×
636
                        return nil, perr
×
637
                }
×
638
                if ejected != nil {
2,217✔
639
                        metricsCacheEvicted.Inc()
1,100✔
640
                        _ = ejected.Close()
1,100✔
641
                }
1,100✔
642
                return nil, nil
1,117✔
643
        })
644
        if err != nil {
1,222✔
645
                return nil, err
52✔
646
        }
52✔
647

648
        mf.mutex.Lock()
1,118✔
649
        defer mf.mutex.Unlock()
1,118✔
650

1,118✔
651
        if mf.closed {
1,118✔
652
                return nil, ErrAlreadyClosed
×
653
        }
×
654

655
        app, err := mf.appendables.Get(appID)
1,118✔
656
        if err != nil {
1,118✔
657
                return nil, err
×
658
        }
×
659

660
        mf.maybePrefetchAheadLocked(appID)
1,118✔
661
        return app, nil
1,118✔
662
}
663

664
// openAppendableSnapshot is a snapshot of the bits that openAppendable
665
// reads from mutable struct fields. Captured under mf.mutex by
666
// maybePrefetchAheadLocked so the spawned prefetch goroutine can call
667
// the per-singleapp open path without taking the multiapp mutex (and
668
// thus without serializing on the writer's chunk-rotation update of
669
// currApp).
670
type openAppendableSnapshot struct {
671
        compressionFormat int
672
        compressionLevel  int
673
        metadata          []byte
674
}
675

676
// maybePrefetchAheadLocked spawns background opens for the next
677
// prefetchAheadDepth chunks when the read pattern is sequential
678
// (current appID == previous appID + 1). For remoteapp-backed
679
// multiapps, each open downloads ~rangeCacheSize bytes — running
680
// them in parallel with the consumer overlaps per-chunk RTT with
681
// per-chunk decode work. Uses singleflight so a foreground miss on
682
// the same appID coalesces with an in-flight prefetch instead of
683
// duplicating the open.
684
//
685
// Caller must hold mf.mutex. The spawned goroutines do NOT take
686
// mf.mutex during their open call (they take it briefly only to
687
// insert into the cache), so foreground readers don't block on a
688
// background network I/O.
689
func (mf *MultiFileAppendable) maybePrefetchAheadLocked(appID int64) {
1,183,598✔
690
        if mf.prefetchAheadDepth <= 0 {
2,367,180✔
691
                mf.prefetchPrevAppID = appID
1,183,582✔
692
                return
1,183,582✔
693
        }
1,183,582✔
694
        if mf.closed {
16✔
695
                return
×
696
        }
×
697
        if mf.prefetchPrevAppID < 0 || mf.prefetchPrevAppID != appID-1 {
21✔
698
                // Not sequential — could be a random-access workload, or
5✔
699
                // the very first read (prefetchPrevAppID = -1 sentinel).
5✔
700
                // Reset and skip prefetch; we don't want to download
5✔
701
                // speculative chunks for random patterns or when we
5✔
702
                // haven't yet established a sequential trend.
5✔
703
                mf.prefetchPrevAppID = appID
5✔
704
                return
5✔
705
        }
5✔
706
        mf.prefetchPrevAppID = appID
11✔
707

11✔
708
        // Snapshot the per-currApp bits under the mutex so the goroutine
11✔
709
        // doesn't read mf.currApp concurrently with the writer's
11✔
710
        // chunk-rotation. CompressionFormat/Level/Metadata are themselves
11✔
711
        // stable across an appendable's lifetime, but mf.currApp is a
11✔
712
        // pointer that the writer reassigns when rotating to the next
11✔
713
        // chunk — that pointer reassignment is the race we're avoiding.
11✔
714
        snap := openAppendableSnapshot{
11✔
715
                compressionFormat: mf.currApp.CompressionFormat(),
11✔
716
                compressionLevel:  mf.currApp.CompressionLevel(),
11✔
717
                metadata:          mf.currApp.Metadata(),
11✔
718
        }
11✔
719

11✔
720
        ctx := mf.prefetchCtx
11✔
721
        for i := int64(1); i <= int64(mf.prefetchAheadDepth); i++ {
34✔
722
                nextID := appID + i
23✔
723
                // Don't prefetch the writer's currently active chunk or
23✔
724
                // anything beyond it. For remoteapp the OpenAppendable
23✔
725
                // hook would extend chunkInfos for nonexistent IDs and
23✔
726
                // leave intermediate slots in chunkState_Invalid, which
23✔
727
                // the writer then trips on with ErrInvalidChunkState.
23✔
728
                if nextID >= mf.currAppID {
33✔
729
                        break
10✔
730
                }
731
                // Already cached?
732
                if v, err := mf.appendables.Get(nextID); err == nil {
17✔
733
                        if rc, ok := v.(*refCountedApp); ok {
8✔
734
                                _ = rc.Release()
4✔
735
                        }
4✔
736
                        continue
4✔
737
                }
738

739
                key := strconv.FormatInt(nextID, 10)
9✔
740
                go mf.prefetchOne(ctx, nextID, key, snap)
9✔
741
        }
742
}
743

744
// prefetchOne is the worker that actually issues an open for the
745
// pre-warmed chunk. Runs in its own goroutine; coalesces concurrent
746
// callers via singleflight; bails on context cancellation.
747
func (mf *MultiFileAppendable) prefetchOne(ctx context.Context, appID int64, key string, snap openAppendableSnapshot) {
9✔
748
        _, _, _ = mf.prefetchSf.Do(key, func() (interface{}, error) {
16✔
749
                if ctx.Err() != nil {
7✔
750
                        return nil, ctx.Err()
×
751
                }
×
752
                raw, err := mf.openAppendableFromSnapshot(snap, appendableName(appID, mf.fileExt), false, false)
7✔
753
                if err != nil {
7✔
754
                        return nil, err
×
755
                }
×
756

757
                mf.mutex.Lock()
7✔
758
                defer mf.mutex.Unlock()
7✔
759

7✔
760
                if mf.closed {
7✔
761
                        _ = raw.Close()
×
762
                        return nil, ErrAlreadyClosed
×
763
                }
×
764
                // Lost the race against another opener? Drop our raw, keep
765
                // what's already cached.
766
                if existing, gerr := mf.appendables.Get(appID); gerr == nil {
10✔
767
                        if rc, ok := existing.(*refCountedApp); ok {
6✔
768
                                _ = rc.Release()
3✔
769
                        }
3✔
770
                        _ = raw.Close()
3✔
771
                        return nil, nil
3✔
772
                }
773
                _, ejected, perr := mf.appendables.Put(appID, raw)
4✔
774
                if perr != nil {
4✔
775
                        _ = raw.Close()
×
776
                        return nil, perr
×
777
                }
×
778
                if ejected != nil {
8✔
779
                        _ = ejected.Close()
4✔
780
                }
4✔
781
                return nil, nil
4✔
782
        })
783
}
784

785
// openAppendableFromSnapshot is the lock-free variant of
786
// openAppendable used by the prefetch goroutine. It reads only
787
// invariant struct fields (set once at construction) plus the
788
// caller-supplied snapshot of the writer-mutable currApp bits.
789
func (mf *MultiFileAppendable) openAppendableFromSnapshot(snap openAppendableSnapshot, appname string, createIfNotExists, activeChunk bool) (appendable.Appendable, error) {
1,176✔
790
        appendableOpts := singleapp.DefaultOptions().
1,176✔
791
                WithReadOnly(mf.readOnly).
1,176✔
792
                WithRetryableSync(mf.retryableSync).
1,176✔
793
                WithAutoSync(mf.autoSync).
1,176✔
794
                WithFileMode(mf.fileMode).
1,176✔
795
                WithCreateIfNotExists(createIfNotExists).
1,176✔
796
                WithReadBufferSize(mf.readBufferSize).
1,176✔
797
                WithCompressionFormat(snap.compressionFormat).
1,176✔
798
                WithCompresionLevel(snap.compressionLevel).
1,176✔
799
                WithMetadata(snap.metadata)
1,176✔
800

1,176✔
801
        if mf.prealloc {
1,176✔
802
                appendableOpts.WithPreallocSize(mf.fileSize)
×
803
        }
×
804

805
        return mf.hooks.OpenAppendable(appendableOpts, appname, activeChunk)
1,176✔
806
}
807

808
func (mf *MultiFileAppendable) ReadAt(bs []byte, off int64) (int, error) {
1,172,530✔
809
        if len(bs) == 0 {
1,172,532✔
810
                return 0, ErrIllegalArguments
2✔
811
        }
2✔
812

813
        metricsReads.Inc()
1,172,528✔
814

1,172,528✔
815
        r := 0
1,172,528✔
816

1,172,528✔
817
        for r < len(bs) {
2,356,178✔
818
                offr := off + int64(r)
1,183,650✔
819

1,183,650✔
820
                app, err := mf.appendableFor(offr)
1,183,650✔
821
                if err != nil {
1,183,703✔
822
                        metricsReadBytes.Add(float64(r))
53✔
823

53✔
824
                        if os.IsNotExist(err) {
105✔
825
                                return r, io.EOF
52✔
826
                        }
52✔
827

828
                        metricsReadErrors.Inc()
1✔
829
                        return r, err
1✔
830
                }
831

832
                rn, err := app.ReadAt(bs[r:], offr%int64(mf.fileSize))
1,183,597✔
833
                // If app is a refcounted handle from the cache, release it
1,183,597✔
834
                // now that the read is done. The currApp short-circuit in
1,183,597✔
835
                // appendableFor returns the writer-owned current appendable,
1,183,597✔
836
                // which is not refcounted; type-assert to skip release.
1,183,597✔
837
                if rc, ok := app.(*refCountedApp); ok {
1,209,612✔
838
                        if rerr := rc.Release(); rerr != nil && err == nil {
26,015✔
839
                                err = rerr
×
840
                        }
×
841
                }
842
                r += rn
1,183,597✔
843

1,183,597✔
844
                if errors.Is(err, io.EOF) {
1,204,716✔
845
                        if rn > 0 {
32,241✔
846
                                continue
11,122✔
847
                        }
848

849
                        metricsReadBytes.Add(float64(r))
9,997✔
850
                        return r, err
9,997✔
851
                }
852

853
                if err != nil {
1,162,485✔
854
                        metricsReadBytes.Add(float64(r))
7✔
855
                        metricsReadErrors.Inc()
7✔
856
                        return r, err
7✔
857
                }
7✔
858
        }
859

860
        metricsReadBytes.Add(float64(r))
1,162,471✔
861
        return r, nil
1,162,471✔
862
}
863

864
func (mf *MultiFileAppendable) SwitchToReadOnlyMode() error {
3✔
865
        mf.mutex.Lock()
3✔
866
        defer mf.mutex.Unlock()
3✔
867

3✔
868
        if mf.closed {
4✔
869
                return ErrAlreadyClosed
1✔
870
        }
1✔
871

872
        if mf.readOnly {
3✔
873
                return ErrReadOnly
1✔
874
        }
1✔
875

876
        // only current appendable needs to be switched to read-only mode
877
        err := mf.currApp.SwitchToReadOnlyMode()
1✔
878
        if err != nil {
1✔
879
                return err
×
880
        }
×
881

882
        mf.writeBuffer = nil
1✔
883
        mf.readOnly = true
1✔
884

1✔
885
        return nil
1✔
886
}
887

888
func (mf *MultiFileAppendable) Flush() error {
380,997✔
889
        mf.mutex.Lock()
380,997✔
890
        defer mf.mutex.Unlock()
380,997✔
891

380,997✔
892
        if mf.closed {
381,006✔
893
                return ErrAlreadyClosed
9✔
894
        }
9✔
895

896
        if mf.readOnly {
380,989✔
897
                return ErrReadOnly
1✔
898
        }
1✔
899

900
        return mf.currApp.Flush()
380,987✔
901
}
902

903
func (mf *MultiFileAppendable) Sync() error {
56,885✔
904
        mf.mutex.Lock()
56,885✔
905
        defer mf.mutex.Unlock()
56,885✔
906

56,885✔
907
        if mf.closed {
56,886✔
908
                return ErrAlreadyClosed
1✔
909
        }
1✔
910

911
        if mf.readOnly {
56,885✔
912
                return ErrReadOnly
1✔
913
        }
1✔
914

915
        return mf.sync()
56,883✔
916
}
917

918
func (mf *MultiFileAppendable) sync() error {
56,885✔
919
        // sync is only needed in the current appendable:
56,885✔
920
        // - with retryable sync, non-active appendables were already synced
56,885✔
921
        // - with non-retryable sync, data may be lost in previous flush or sync calls
56,885✔
922
        return mf.currApp.Sync()
56,885✔
923
}
56,885✔
924

925
func (mf *MultiFileAppendable) Close() error {
19,112✔
926
        mf.mutex.Lock()
19,112✔
927
        defer mf.mutex.Unlock()
19,112✔
928

19,112✔
929
        if mf.closed {
19,114✔
930
                return ErrAlreadyClosed
2✔
931
        }
2✔
932

933
        mf.closed = true
19,110✔
934
        if mf.prefetchCancel != nil {
38,220✔
935
                // Stop accepting new prefetch starts; in-flight ones will
19,110✔
936
                // notice mf.closed when they reach the cache-insert step.
19,110✔
937
                mf.prefetchCancel()
19,110✔
938
        }
19,110✔
939

940
        err := mf.appendables.Apply(func(k int64, v appendable.Appendable) error {
19,739✔
941
                return v.Close()
629✔
942
        })
629✔
943
        if err != nil {
19,111✔
944
                return err
1✔
945
        }
1✔
946

947
        return mf.currApp.Close()
19,109✔
948
}
949

950
func (mf *MultiFileAppendable) CurrApp() (appendable.Appendable, int64) {
1✔
951
        mf.mutex.Lock()
1✔
952
        defer mf.mutex.Unlock()
1✔
953
        return mf.currApp, mf.currAppID
1✔
954
}
1✔
955

956
func (mf *MultiFileAppendable) ReplaceCachedChunk(appID int64, app appendable.Appendable) (appendable.Appendable, error) {
114✔
957
        return mf.appendables.Replace(appID, app)
114✔
958
}
114✔
959

960
func minInt(a, b int) int {
1,329,541✔
961
        if a <= b {
1,334,232✔
962
                return a
4,691✔
963
        }
4,691✔
964
        return b
1,324,850✔
965
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc