• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 24841644892

23 Apr 2026 02:44PM UTC coverage: 85.279% (-4.0%) from 89.306%
24841644892

push

gh-ci

web-flow
feat: v1.11.0 PostgreSQL compatibility and SQL feature expansion (#2090)

* Add structured audit logging with immutable audit trail

Introduces a new --audit-log flag that records all gRPC operations as
structured JSON events in immudb's tamper-proof KV store. Events are
stored under the audit: key prefix in systemdb, queryable via Scan and
verifiable via VerifiableGet. An async buffered writer ensures minimal
latency impact. Configurable event filtering (all/write/admin) via
--audit-log-events flag.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PostgreSQL ORM compatibility layer and verification functions

Extend the pgsql wire protocol with immudb verification functions
(immudb_state, immudb_verify_row, immudb_verify_tx, immudb_history,
immudb_tx) accessible via standard SQL SELECT statements.

Add pg_catalog resolvers (pg_attribute, pg_index, pg_constraint,
pg_type, pg_settings, pg_description) and information_schema
resolvers (tables, columns, schemata, key_column_usage) to support
ORM introspection from Django, SQLAlchemy, GORM, and ActiveRecord.

Add PostgreSQL compatibility functions: current_database,
current_schema, current_user, format_type, pg_encoding_to_char,
pg_get_expr, pg_get_constraintdef, obj_description, col_description,
has_table_privilege, has_schema_privilege, and others.

Add SHOW statement emulation for common ORM config queries and
schema-qualified name stripping for information_schema and public
schema references.

* Implement EXISTS and IN subquery support in SQL engine

Replace the previously stubbed ExistsBoolExp and InSubQueryExp
implementations with working non-correlated subquery execution.

EXISTS subqueries resolve the inner SELECT and check if any rows
are returned. IN subqueries resolve the inner SELECT, iterate the
result set, and compare each value against the outer expression.
Both support NOT variants (NOT EXISTS, NOT IN).

Correlated subqueries (referencing outer query columns) ar... (continued)

7254 of 10471 new or added lines in 124 files covered. (69.28%)

115 existing lines in 18 files now uncovered.

44599 of 52298 relevant lines covered (85.28%)

127676.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.6
/embedded/appendable/multiapp/multi_app.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package multiapp
18

19
import (
20
        "errors"
21
        "fmt"
22
        "io"
23
        "os"
24
        "path"
25
        "path/filepath"
26
        "strconv"
27
        "strings"
28
        "sync"
29

30
        "github.com/codenotary/immudb/embedded/appendable"
31
        "github.com/codenotary/immudb/embedded/appendable/fileutils"
32
        "github.com/codenotary/immudb/embedded/appendable/singleapp"
33
        "github.com/codenotary/immudb/embedded/cache"
34
)
35

36
var ErrorPathIsNotADirectory = errors.New("multiapp: path is not a directory")
37
var ErrIllegalArguments = errors.New("multiapp: illegal arguments")
38
var ErrInvalidOptions = fmt.Errorf("%w: invalid options", ErrIllegalArguments)
39
var ErrAlreadyClosed = errors.New("multiapp: already closed")
40
var ErrReadOnly = errors.New("multiapp: read-only mode")
41

42
const (
43
        metaFileSize    = "FILE_SIZE"
44
        metaWrappedMeta = "WRAPPED_METADATA"
45
)
46

47
//---------------------------------------------------------
48

49
var _ appendable.Appendable = (*MultiFileAppendable)(nil)
50

51
type MultiFileAppendableHooks interface {
52
        // Hook to open underlying appendable.
53
        // If needsWriteAccess is set to true, this appendable must be a single file appendable
54
        OpenAppendable(options *singleapp.Options, appname string, needsWriteAccess bool) (appendable.Appendable, error)
55

56
        // Hook to open the last underlying appendable that's available
57
        OpenInitialAppendable(opts *Options, singleAppOpts *singleapp.Options) (app appendable.Appendable, appID int64, err error)
58
}
59

60
type DefaultMultiFileAppendableHooks struct {
61
        path string
62
}
63

64
func (d *DefaultMultiFileAppendableHooks) OpenInitialAppendable(opts *Options, singleAppOpts *singleapp.Options) (app appendable.Appendable, appID int64, err error) {
20,121✔
65
        entries, err := os.ReadDir(d.path)
20,121✔
66
        if err != nil {
20,121✔
67
                return nil, 0, err
×
68
        }
×
69

70
        var filename string
20,121✔
71

20,121✔
72
        if len(entries) > 0 {
21,021✔
73
                filename = entries[len(entries)-1].Name()
900✔
74

900✔
75
                appID, err = strconv.ParseInt(strings.TrimSuffix(filename, filepath.Ext(filename)), 10, 64)
900✔
76
                if err != nil {
901✔
77
                        return nil, 0, err
1✔
78
                }
1✔
79
        } else {
19,221✔
80
                appID = 0
19,221✔
81
                filename = appendableName(appendableID(0, opts.fileSize), opts.fileExt)
19,221✔
82
        }
19,221✔
83

84
        app, err = d.OpenAppendable(singleAppOpts, filename, true)
20,120✔
85
        if err != nil {
20,121✔
86
                return nil, 0, err
1✔
87
        }
1✔
88

89
        return app, appID, nil
20,119✔
90
}
91

92
func (d *DefaultMultiFileAppendableHooks) OpenAppendable(options *singleapp.Options, appname string, needsWriteAccess bool) (appendable.Appendable, error) {
25,939✔
93
        return singleapp.Open(filepath.Join(d.path, appname), options)
25,939✔
94
}
25,939✔
95

96
type MultiFileAppendable struct {
97
        appendables appendableCache
98

99
        currAppID int64
100
        currApp   appendable.Appendable
101

102
        path           string
103
        readOnly       bool
104
        retryableSync  bool
105
        autoSync       bool
106
        fileMode       os.FileMode
107
        fileSize       int
108
        fileExt        string
109
        readBufferSize int
110
        prealloc       bool
111

112
        writeBuffer []byte // shared write-buffer only used by active appendable
113

114
        closed bool
115

116
        hooks MultiFileAppendableHooks
117

118
        mutex sync.Mutex
119
}
120

121
func Open(path string, opts *Options) (*MultiFileAppendable, error) {
20,127✔
122
        return OpenWithHooks(path, &DefaultMultiFileAppendableHooks{
20,127✔
123
                path: path,
20,127✔
124
        }, opts)
20,127✔
125
}
20,127✔
126

127
func OpenWithHooks(path string, hooks MultiFileAppendableHooks, opts *Options) (*MultiFileAppendable, error) {
20,202✔
128
        err := opts.Validate()
20,202✔
129
        if err != nil {
20,203✔
130
                return nil, err
1✔
131
        }
1✔
132

133
        finfo, err := os.Stat(path)
20,201✔
134
        if err != nil {
39,457✔
135
                if !os.IsNotExist(err) || opts.readOnly {
19,256✔
136
                        return nil, err
×
137
                }
×
138

139
                err = os.Mkdir(path, opts.fileMode)
19,256✔
140
                if err != nil {
19,256✔
141
                        return nil, err
×
142
                }
×
143

144
                err = fileutils.SyncDir(path, filepath.Dir(path))
19,256✔
145
                if err != nil {
19,256✔
146
                        return nil, err
×
147
                }
×
148
        } else if !finfo.IsDir() {
951✔
149
                return nil, ErrorPathIsNotADirectory
6✔
150
        }
6✔
151

152
        m := appendable.NewMetadata(nil)
20,195✔
153
        m.PutInt(metaFileSize, opts.fileSize)
20,195✔
154
        m.Put(metaWrappedMeta, opts.metadata)
20,195✔
155

20,195✔
156
        var writeBuffer []byte
20,195✔
157

20,195✔
158
        if !opts.readOnly {
40,383✔
159
                // write buffer is only needed when appendable is not opened in read-only mode
20,188✔
160
                writeBuffer = make([]byte, opts.GetWriteBufferSize())
20,188✔
161
        }
20,188✔
162

163
        appendableOpts := singleapp.DefaultOptions().
20,195✔
164
                WithReadOnly(opts.readOnly).
20,195✔
165
                WithRetryableSync(opts.retryableSync).
20,195✔
166
                WithAutoSync(opts.autoSync).
20,195✔
167
                WithFileMode(opts.fileMode).
20,195✔
168
                WithCompressionFormat(opts.compressionFormat).
20,195✔
169
                WithCompresionLevel(opts.compressionLevel).
20,195✔
170
                WithReadBufferSize(opts.readBufferSize).
20,195✔
171
                WithWriteBuffer(writeBuffer).
20,195✔
172
                WithMetadata(m.Bytes())
20,195✔
173

20,195✔
174
        if opts.prealloc {
20,199✔
175
                appendableOpts.WithPreallocSize(opts.fileSize)
4✔
176
        }
4✔
177

178
        currApp, currAppID, err := hooks.OpenInitialAppendable(opts, appendableOpts)
20,195✔
179
        if err != nil {
20,200✔
180
                return nil, err
5✔
181
        }
5✔
182

183
        cache, err := cache.NewCache(opts.maxOpenedFiles)
20,190✔
184
        if err != nil {
20,190✔
185
                return nil, err
×
186
        }
×
187

188
        fileSize, _ := appendable.NewMetadata(currApp.Metadata()).GetInt(metaFileSize)
20,190✔
189

20,190✔
190
        return &MultiFileAppendable{
20,190✔
191
                appendables:    appendableCache{cache: cache},
20,190✔
192
                currAppID:      currAppID,
20,190✔
193
                currApp:        currApp,
20,190✔
194
                path:           path,
20,190✔
195
                readOnly:       opts.readOnly,
20,190✔
196
                retryableSync:  opts.retryableSync,
20,190✔
197
                autoSync:       opts.autoSync,
20,190✔
198
                fileMode:       opts.fileMode,
20,190✔
199
                fileSize:       fileSize,
20,190✔
200
                fileExt:        opts.fileExt,
20,190✔
201
                readBufferSize: opts.readBufferSize,
20,190✔
202
                prealloc:       opts.prealloc,
20,190✔
203
                writeBuffer:    writeBuffer,
20,190✔
204
                closed:         false,
20,190✔
205
                hooks:          hooks,
20,190✔
206
        }, nil
20,190✔
207
}
208

209
func appendableName(appID int64, ext string) string {
25,340✔
210
        return fmt.Sprintf("%08d.%s", appID, ext)
25,340✔
211
}
25,340✔
212

213
func appendableID(off int64, fileSize int) int64 {
1,206,003✔
214
        return off / int64(fileSize)
1,206,003✔
215
}
1,206,003✔
216

217
func (mf *MultiFileAppendable) Copy(dstPath string) error {
3✔
218
        mf.mutex.Lock()
3✔
219
        defer mf.mutex.Unlock()
3✔
220

3✔
221
        if mf.closed {
4✔
222
                return ErrAlreadyClosed
1✔
223
        }
1✔
224

225
        if !mf.readOnly {
4✔
226
                err := mf.sync()
2✔
227
                if err != nil {
2✔
228
                        return err
×
229
                }
×
230
        }
231

232
        err := os.MkdirAll(dstPath, mf.fileMode)
2✔
233
        if err != nil {
3✔
234
                return err
1✔
235
        }
1✔
236

237
        entries, err := os.ReadDir(mf.path)
1✔
238
        if err != nil {
1✔
239
                return err
×
240
        }
×
241

242
        for _, e := range entries {
4✔
243
                _, err = copyFile(path.Join(mf.path, e.Name()), path.Join(dstPath, e.Name()))
3✔
244
                if err != nil {
3✔
245
                        return err
×
246
                }
×
247
        }
248

249
        return nil
1✔
250
}
251

252
func copyFile(srcPath, dstPath string) (int64, error) {
3✔
253
        dstFile, err := os.Create(dstPath)
3✔
254
        if err != nil {
3✔
255
                return 0, err
×
256
        }
×
257
        defer dstFile.Close()
3✔
258

3✔
259
        srcFile, err := os.Open(srcPath)
3✔
260
        if err != nil {
3✔
261
                return 0, err
×
262
        }
×
263
        defer srcFile.Close()
3✔
264

3✔
265
        return io.Copy(dstFile, srcFile)
3✔
266
}
267

268
func (mf *MultiFileAppendable) CompressionFormat() int {
1✔
269
        mf.mutex.Lock()
1✔
270
        defer mf.mutex.Unlock()
1✔
271

1✔
272
        return mf.currApp.CompressionFormat()
1✔
273
}
1✔
274

275
func (mf *MultiFileAppendable) CompressionLevel() int {
1✔
276
        mf.mutex.Lock()
1✔
277
        defer mf.mutex.Unlock()
1✔
278

1✔
279
        return mf.currApp.CompressionLevel()
1✔
280
}
1✔
281

282
func (mf *MultiFileAppendable) Metadata() []byte {
6,682✔
283
        mf.mutex.Lock()
6,682✔
284
        defer mf.mutex.Unlock()
6,682✔
285

6,682✔
286
        bs, _ := appendable.NewMetadata(mf.currApp.Metadata()).Get(metaWrappedMeta)
6,682✔
287
        return bs
6,682✔
288
}
6,682✔
289

290
func (mf *MultiFileAppendable) Size() (int64, error) {
7,050✔
291
        mf.mutex.Lock()
7,050✔
292
        defer mf.mutex.Unlock()
7,050✔
293

7,050✔
294
        if mf.closed {
7,051✔
295
                return 0, ErrAlreadyClosed
1✔
296
        }
1✔
297
        currSize, err := mf.currApp.Size()
7,049✔
298
        if err != nil {
7,049✔
299
                return 0, err
×
300
        }
×
301

302
        return mf.currAppID*int64(mf.fileSize) + currSize, nil
7,049✔
303
}
304

305
func (mf *MultiFileAppendable) Append(bs []byte) (off int64, n int, err error) {
1,326,646✔
306
        mf.mutex.Lock()
1,326,646✔
307
        defer mf.mutex.Unlock()
1,326,646✔
308

1,326,646✔
309
        if mf.closed {
1,326,647✔
310
                return 0, 0, ErrAlreadyClosed
1✔
311
        }
1✔
312

313
        if mf.readOnly {
1,326,647✔
314
                return 0, 0, ErrReadOnly
2✔
315
        }
2✔
316

317
        if len(bs) == 0 {
1,326,645✔
318
                return 0, 0, ErrIllegalArguments
2✔
319
        }
2✔
320

321
        for n < len(bs) {
2,657,661✔
322
                available := mf.fileSize - int(mf.currApp.Offset())
1,331,020✔
323

1,331,020✔
324
                if available <= 0 {
1,335,664✔
325
                        // by switching to read-only mode, the write buffer is freed
4,644✔
326
                        err = mf.currApp.SwitchToReadOnlyMode()
4,644✔
327
                        if err != nil {
4,644✔
328
                                return off, n, err
×
329
                        }
×
330

331
                        _, ejectedApp, err := mf.appendables.Put(mf.currAppID, mf.currApp)
4,644✔
332
                        if err != nil {
4,644✔
333
                                return off, n, err
×
334
                        }
×
335

336
                        if ejectedApp != nil {
8,549✔
337
                                metricsCacheEvicted.Inc()
3,905✔
338
                                err = ejectedApp.Close()
3,905✔
339
                                if err != nil {
3,906✔
340
                                        return off, n, err
1✔
341
                                }
1✔
342

343
                        }
344

345
                        mf.currAppID++
4,643✔
346
                        currApp, err := mf.openAppendable(appendableName(mf.currAppID, mf.fileExt), true, true)
4,643✔
347
                        if err != nil {
4,643✔
348
                                return off, n, err
×
349
                        }
×
350

351
                        mf.currApp = currApp
4,643✔
352

4,643✔
353
                        err = currApp.SetOffset(0)
4,643✔
354
                        if err != nil {
4,643✔
355
                                return off, n, err
×
356
                        }
×
357

358
                        available = mf.fileSize
4,643✔
359
                }
360

361
                var d int
1,331,019✔
362

1,331,019✔
363
                if mf.currApp.CompressionFormat() == appendable.NoCompression {
2,659,037✔
364
                        d = minInt(available, len(bs)-n)
1,328,018✔
365
                } else {
1,331,019✔
366
                        d = len(bs) - n
3,001✔
367
                }
3,001✔
368

369
                offn, _, err := mf.currApp.Append(bs[n : n+d])
1,331,019✔
370
                if err != nil {
1,331,019✔
371
                        return off, n, err
×
372
                }
×
373

374
                if n == 0 {
2,657,659✔
375
                        off = offn + mf.currAppID*int64(mf.fileSize)
1,326,640✔
376
                }
1,326,640✔
377

378
                n += d
1,331,019✔
379
        }
380

381
        return
1,326,640✔
382
}
383

384
func (mf *MultiFileAppendable) openAppendable(appname string, createIfNotExists, activeChunk bool) (appendable.Appendable, error) {
5,888✔
385
        appendableOpts := singleapp.DefaultOptions().
5,888✔
386
                WithReadOnly(mf.readOnly).
5,888✔
387
                WithRetryableSync(mf.retryableSync).
5,888✔
388
                WithAutoSync(mf.autoSync).
5,888✔
389
                WithFileMode(mf.fileMode).
5,888✔
390
                WithCreateIfNotExists(createIfNotExists).
5,888✔
391
                WithReadBufferSize(mf.readBufferSize).
5,888✔
392
                WithCompressionFormat(mf.currApp.CompressionFormat()).
5,888✔
393
                WithCompresionLevel(mf.currApp.CompressionLevel()).
5,888✔
394
                WithMetadata(mf.currApp.Metadata())
5,888✔
395

5,888✔
396
        if mf.prealloc {
5,888✔
397
                appendableOpts.WithPreallocSize(mf.fileSize)
×
398
        }
×
399

400
        if activeChunk && !mf.readOnly {
10,536✔
401
                appendableOpts.WithWriteBuffer(mf.writeBuffer)
4,648✔
402
        }
4,648✔
403

404
        return mf.hooks.OpenAppendable(appendableOpts, appname, activeChunk)
5,888✔
405
}
406

407
func (mf *MultiFileAppendable) Offset() int64 {
1✔
408
        mf.mutex.Lock()
1✔
409
        defer mf.mutex.Unlock()
1✔
410

1✔
411
        return mf.offset()
1✔
412
}
1✔
413

414
func (mf *MultiFileAppendable) offset() int64 {
576,032✔
415
        return mf.currAppID*int64(mf.fileSize) + mf.currApp.Offset()
576,032✔
416
}
576,032✔
417

418
func (mf *MultiFileAppendable) SetOffset(off int64) error {
575,017✔
419
        mf.mutex.Lock()
575,017✔
420
        defer mf.mutex.Unlock()
575,017✔
421

575,017✔
422
        if mf.closed {
575,018✔
423
                return ErrAlreadyClosed
1✔
424
        }
1✔
425

426
        if mf.readOnly {
575,016✔
427
                return ErrReadOnly
×
428
        }
×
429

430
        currOffset := mf.offset()
575,016✔
431

575,016✔
432
        if off > currOffset {
575,016✔
433
                return fmt.Errorf("%w: provided offset %d is bigger than current one %d", ErrIllegalArguments, off, currOffset)
×
434
        }
×
435

436
        if off == currOffset {
1,150,005✔
437
                return nil
574,989✔
438
        }
574,989✔
439

440
        appID := appendableID(off, mf.fileSize)
27✔
441

27✔
442
        // given the new offset is lower than the current one, it means
27✔
443
        // either appID ==  mf.currAppID or appID < mf.currAppID must hold
27✔
444

27✔
445
        if mf.currAppID != appID {
32✔
446

5✔
447
                // Head might have moved back, this means that all
5✔
448
                // chunks that follow are no longer valid (will be overwritten anyway).
5✔
449
                // We also must flush / close current chunk since it will be reopened.
5✔
450
                for id := appID; id < mf.currAppID; id++ {
16✔
451
                        app, err := mf.appendables.Pop(id)
11✔
452
                        if errors.Is(err, cache.ErrKeyNotFound) {
20✔
453
                                continue
9✔
454
                        }
455
                        if err != nil {
2✔
456
                                return err
×
457
                        }
×
458
                        err = app.Close()
2✔
459
                        if err != nil {
2✔
460
                                return err
×
461
                        }
×
462
                }
463

464
                // close current appendable as it's not present in the cache
465
                err := mf.currApp.Close()
5✔
466
                if err != nil {
5✔
467
                        return err
×
468
                }
×
469

470
                app, err := mf.openAppendable(appendableName(appID, mf.fileExt), false, true)
5✔
471
                if err != nil {
8✔
472
                        if os.IsNotExist(err) {
3✔
473
                                return io.EOF
×
474
                        }
×
475
                        return err
3✔
476
                }
477

478
                mf.currAppID = appID
2✔
479
                mf.currApp = app
2✔
480
        }
481

482
        return mf.currApp.SetOffset(off % int64(mf.fileSize))
24✔
483
}
484

485
func (mf *MultiFileAppendable) DiscardUpto(off int64) error {
1,016✔
486
        mf.mutex.Lock()
1,016✔
487
        defer mf.mutex.Unlock()
1,016✔
488

1,016✔
489
        if mf.closed {
1,017✔
490
                return ErrAlreadyClosed
1✔
491
        }
1✔
492

493
        if mf.offset() < off {
1,016✔
494
                return fmt.Errorf("%w: discard beyond existent data boundaries", ErrIllegalArguments)
1✔
495
        }
1✔
496

497
        appID := appendableID(off, mf.fileSize)
1,014✔
498

1,014✔
499
        var dirSyncNeeded bool
1,014✔
500

1,014✔
501
        for i := int64(0); i < appID; i++ {
1,245✔
502
                if i == mf.currAppID {
232✔
503
                        break
1✔
504
                }
505

506
                app, err := mf.appendables.Pop(i)
230✔
507
                if err == nil {
320✔
508
                        err = app.Close()
90✔
509
                        if err != nil {
90✔
510
                                return err
×
511
                        }
×
512
                }
513

514
                appFile := filepath.Join(mf.path, appendableName(i, mf.fileExt))
230✔
515
                err = os.Remove(appFile)
230✔
516
                if err != nil && !os.IsNotExist(err) {
230✔
517
                        return err
×
518
                }
×
519

520
                dirSyncNeeded = true
230✔
521
        }
522

523
        if dirSyncNeeded {
1,051✔
524
                err := fileutils.SyncDir(mf.path)
37✔
525
                if err != nil {
37✔
526
                        return err
×
527
                }
×
528
        }
529

530
        return nil
1,014✔
531
}
532

533
func (mf *MultiFileAppendable) appendableFor(off int64) (appendable.Appendable, error) {
1,185,742✔
534
        mf.mutex.Lock()
1,185,742✔
535
        defer mf.mutex.Unlock()
1,185,742✔
536

1,185,742✔
537
        if mf.closed {
1,185,743✔
538
                return nil, ErrAlreadyClosed
1✔
539
        }
1✔
540

541
        appID := appendableID(off, mf.fileSize)
1,185,741✔
542

1,185,741✔
543
        if appID == mf.currAppID {
2,345,350✔
544
                metricsCacheHit.Inc()
1,159,609✔
545
                return mf.currApp, nil
1,159,609✔
546
        }
1,159,609✔
547

548
        app, err := mf.appendables.Get(appID)
26,132✔
549

26,132✔
550
        if err != nil {
27,372✔
551
                if !errors.Is(err, cache.ErrKeyNotFound) {
1,240✔
552
                        return nil, err
×
553
                }
×
554

555
                metricsCacheMiss.Inc()
1,240✔
556

1,240✔
557
                raw, err := mf.openAppendable(appendableName(appID, mf.fileExt), false, false)
1,240✔
558
                if err != nil {
1,292✔
559
                        return nil, err
52✔
560
                }
52✔
561

562
                _, ejectedApp, err := mf.appendables.Put(appID, raw)
1,188✔
563
                if err != nil {
1,188✔
564
                        return nil, err
×
565
                }
×
566

567
                if ejectedApp != nil {
2,359✔
568
                        metricsCacheEvicted.Inc()
1,171✔
569
                        err = ejectedApp.Close()
1,171✔
570
                        if err != nil {
1,171✔
571
                                return nil, err
×
572
                        }
×
573
                }
574

575
                // Re-fetch via Get so the caller gets the refcounted wrapper
576
                // (with one ref already taken). Returning the raw pointer
577
                // from openAppendable would bypass the refcount and reproduce
578
                // the close-while-reading race that this whole machinery
579
                // exists to prevent.
580
                app, err = mf.appendables.Get(appID)
1,188✔
581
                if err != nil {
1,188✔
NEW
582
                        return nil, err
×
NEW
583
                }
×
584
        } else {
24,892✔
585
                metricsCacheHit.Inc()
24,892✔
586
        }
24,892✔
587

588
        return app, nil
26,080✔
589
}
590

591
func (mf *MultiFileAppendable) ReadAt(bs []byte, off int64) (int, error) {
1,174,730✔
592
        if len(bs) == 0 {
1,174,732✔
593
                return 0, ErrIllegalArguments
2✔
594
        }
2✔
595

596
        metricsReads.Inc()
1,174,728✔
597

1,174,728✔
598
        r := 0
1,174,728✔
599

1,174,728✔
600
        for r < len(bs) {
2,360,469✔
601
                offr := off + int64(r)
1,185,741✔
602

1,185,741✔
603
                app, err := mf.appendableFor(offr)
1,185,741✔
604
                if err != nil {
1,185,794✔
605
                        metricsReadBytes.Add(float64(r))
53✔
606

53✔
607
                        if os.IsNotExist(err) {
105✔
608
                                return r, io.EOF
52✔
609
                        }
52✔
610

611
                        metricsReadErrors.Inc()
1✔
612
                        return r, err
1✔
613
                }
614

615
                rn, err := app.ReadAt(bs[r:], offr%int64(mf.fileSize))
1,185,688✔
616
                // If app is a refcounted handle from the cache, release it
1,185,688✔
617
                // now that the read is done. The currApp short-circuit in
1,185,688✔
618
                // appendableFor returns the writer-owned current appendable,
1,185,688✔
619
                // which is not refcounted; type-assert to skip release.
1,185,688✔
620
                if rc, ok := app.(*refCountedApp); ok {
1,211,768✔
621
                        if rerr := rc.Release(); rerr != nil && err == nil {
26,080✔
NEW
622
                                err = rerr
×
NEW
623
                        }
×
624
                }
625
                r += rn
1,185,688✔
626

1,185,688✔
627
                if errors.Is(err, io.EOF) {
1,206,523✔
628
                        if rn > 0 {
31,848✔
629
                                continue
11,013✔
630
                        }
631

632
                        metricsReadBytes.Add(float64(r))
9,822✔
633
                        return r, err
9,822✔
634
                }
635

636
                if err != nil {
1,164,860✔
637
                        metricsReadBytes.Add(float64(r))
7✔
638
                        metricsReadErrors.Inc()
7✔
639
                        return r, err
7✔
640
                }
7✔
641
        }
642

643
        metricsReadBytes.Add(float64(r))
1,164,846✔
644
        return r, nil
1,164,846✔
645
}
646

647
func (mf *MultiFileAppendable) SwitchToReadOnlyMode() error {
3✔
648
        mf.mutex.Lock()
3✔
649
        defer mf.mutex.Unlock()
3✔
650

3✔
651
        if mf.closed {
4✔
652
                return ErrAlreadyClosed
1✔
653
        }
1✔
654

655
        if mf.readOnly {
3✔
656
                return ErrReadOnly
1✔
657
        }
1✔
658

659
        // only current appendable needs to be switched to read-only mode
660
        err := mf.currApp.SwitchToReadOnlyMode()
1✔
661
        if err != nil {
1✔
662
                return err
×
663
        }
×
664

665
        mf.writeBuffer = nil
1✔
666
        mf.readOnly = true
1✔
667

1✔
668
        return nil
1✔
669
}
670

671
func (mf *MultiFileAppendable) Flush() error {
375,980✔
672
        mf.mutex.Lock()
375,980✔
673
        defer mf.mutex.Unlock()
375,980✔
674

375,980✔
675
        if mf.closed {
375,989✔
676
                return ErrAlreadyClosed
9✔
677
        }
9✔
678

679
        if mf.readOnly {
375,972✔
680
                return ErrReadOnly
1✔
681
        }
1✔
682

683
        return mf.currApp.Flush()
375,970✔
684
}
685

686
func (mf *MultiFileAppendable) Sync() error {
55,673✔
687
        mf.mutex.Lock()
55,673✔
688
        defer mf.mutex.Unlock()
55,673✔
689

55,673✔
690
        if mf.closed {
55,674✔
691
                return ErrAlreadyClosed
1✔
692
        }
1✔
693

694
        if mf.readOnly {
55,673✔
695
                return ErrReadOnly
1✔
696
        }
1✔
697

698
        return mf.sync()
55,671✔
699
}
700

701
func (mf *MultiFileAppendable) sync() error {
55,673✔
702
        // sync is only needed in the current appendable:
55,673✔
703
        // - with retryable sync, non-active appendables were already synced
55,673✔
704
        // - with non-retryable sync, data may be lost in previous flush or sync calls
55,673✔
705
        return mf.currApp.Sync()
55,673✔
706
}
55,673✔
707

708
func (mf *MultiFileAppendable) Close() error {
18,907✔
709
        mf.mutex.Lock()
18,907✔
710
        defer mf.mutex.Unlock()
18,907✔
711

18,907✔
712
        if mf.closed {
18,909✔
713
                return ErrAlreadyClosed
2✔
714
        }
2✔
715

716
        mf.closed = true
18,905✔
717

18,905✔
718
        err := mf.appendables.Apply(func(k int64, v appendable.Appendable) error {
19,540✔
719
                return v.Close()
635✔
720
        })
635✔
721
        if err != nil {
18,906✔
722
                return err
1✔
723
        }
1✔
724

725
        return mf.currApp.Close()
18,904✔
726
}
727

728
func (mf *MultiFileAppendable) CurrApp() (appendable.Appendable, int64) {
1✔
729
        mf.mutex.Lock()
1✔
730
        defer mf.mutex.Unlock()
1✔
731
        return mf.currApp, mf.currAppID
1✔
732
}
1✔
733

734
func (mf *MultiFileAppendable) ReplaceCachedChunk(appID int64, app appendable.Appendable) (appendable.Appendable, error) {
113✔
735
        return mf.appendables.Replace(appID, app)
113✔
736
}
113✔
737

738
func minInt(a, b int) int {
1,328,018✔
739
        if a <= b {
1,332,709✔
740
                return a
4,691✔
741
        }
4,691✔
742
        return b
1,323,327✔
743
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc