• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 25364743170

05 May 2026 08:00AM UTC coverage: 85.038% (-0.2%) from 85.255%
25364743170

push

gh-ci

vchaindz
chore(deps): bump jackc/pgx/v5 v5.9.1 -> v5.9.2 (CVE-2026-41889)

Patches a low-severity SQL-injection edge case in pgx's simple-protocol
codepath when a dollar-quoted string literal embeds attacker-controllable
placeholder text (GHSA-j88v-2chj-qfwx).

Not exploitable in immudb: pgx is a test-only dependency used by
pkg/pgsql/server/pgsql_{hardened,compat_integration,integration}_test.go
to drive the wire-compat layer with a real Postgres client. It is not
in the production package graph (`go list -deps ./cmd/... ./pkg/...
./embedded/...` returns 0 for jackc/pgx). Bumped purely to keep the
Dependabot alert clean.

Verified:
  go build ./...                                ok
  go test -count=1 ./pkg/pgsql/server/          ok

45157 of 53102 relevant lines covered (85.04%)

126482.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/pkg/database/database.go
1
/*
2
Copyright 2026 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26
        "os"
27
        "path/filepath"
28
        "sync"
29
        "time"
30

31
        "github.com/codenotary/immudb/embedded/document"
32
        "github.com/codenotary/immudb/embedded/sql"
33
        "github.com/codenotary/immudb/embedded/store"
34

35
        "github.com/codenotary/immudb/embedded/logger"
36
        "github.com/codenotary/immudb/pkg/api/schema"
37
)
38

39
const (
40
        MaxKeyResolutionLimit = 1
41
        MaxKeyScanLimit       = 2500
42
)
43

44
var (
45
        ErrKeyResolutionLimitReached  = errors.New("key resolution limit reached. It may be due to cyclic references")
46
        ErrResultSizeLimitExceeded    = errors.New("result size limit exceeded")
47
        ErrResultSizeLimitReached     = errors.New("result size limit reached")
48
        ErrIllegalArguments           = store.ErrIllegalArguments
49
        ErrIllegalState               = store.ErrIllegalState
50
        ErrIsReplica                  = errors.New("database is read-only because it's a replica")
51
        ErrNotReplica                 = errors.New("database is NOT a replica")
52
        ErrReplicaDivergedFromPrimary = errors.New("replica diverged from primary")
53
        ErrInvalidRevision            = errors.New("invalid key revision number")
54
)
55

56
type DB interface {
57
        GetName() string
58

59
        // Setttings
60
        GetOptions() *Options
61

62
        Path() string
63

64
        AsReplica(asReplica, syncReplication bool, syncAcks int)
65
        IsReplica() bool
66

67
        IsSyncReplicationEnabled() bool
68
        SetSyncReplication(enabled bool)
69

70
        MaxResultSize() int
71

72
        // State
73
        Health() (waitingCount int, lastReleaseAt time.Time)
74
        CurrentState() (*schema.ImmutableState, error)
75

76
        Size() (uint64, error)
77

78
        TxCount() (uint64, error)
79

80
        // Key-Value
81
        Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error)
82
        VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error)
83

84
        Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error)
85
        VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error)
86
        GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error)
87

88
        Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error)
89

90
        SetReference(ctx context.Context, req *schema.ReferenceRequest) (*schema.TxHeader, error)
91
        VerifiableSetReference(ctx context.Context, req *schema.VerifiableReferenceRequest) (*schema.VerifiableTx, error)
92

93
        Scan(ctx context.Context, req *schema.ScanRequest) (*schema.Entries, error)
94

95
        History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error)
96

97
        ExecAll(ctx context.Context, operations *schema.ExecAllRequest) (*schema.TxHeader, error)
98

99
        Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error)
100
        CountAll(ctx context.Context) (*schema.EntryCount, error)
101

102
        ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error)
103
        VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error)
104
        ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error)
105

106
        // SQL-related
107
        NewSQLTx(ctx context.Context, opts *sql.TxOptions) (*sql.SQLTx, error)
108

109
        SQLExec(ctx context.Context, tx *sql.SQLTx, req *schema.SQLExecRequest) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
110
        SQLExecPrepared(ctx context.Context, tx *sql.SQLTx, stmts []sql.SQLStmt, params map[string]interface{}) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
111

112
        InferParameters(ctx context.Context, tx *sql.SQLTx, sql string) (map[string]sql.SQLValueType, error)
113
        InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.SQLStmt) (map[string]sql.SQLValueType, error)
114

115
        SQLQuery(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) (sql.RowReader, error)
116
        SQLQueryAll(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) ([]*sql.Row, error)
117
        SQLQueryPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, params map[string]interface{}) (sql.RowReader, error)
118

119
        VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error)
120

121
        CopySQLCatalog(ctx context.Context, txID uint64) (uint64, error)
122

123
        ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error)
124
        DescribeTable(ctx context.Context, tx *sql.SQLTx, table string) (*schema.SQLQueryResult, error)
125

126
        // Transactional layer
127
        WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error
128
        WaitForIndexingUpto(ctx context.Context, txID uint64) error
129

130
        TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error)
131
        ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error)
132
        ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error)
133
        AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error
134
        DiscardPrecommittedTxsSince(txID uint64) error
135

136
        VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error)
137
        TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error)
138

139
        // Truncation
140
        FindTruncationPoint(ctx context.Context, until time.Time) (*schema.TxHeader, error)
141
        TruncateUptoTx(ctx context.Context, txID uint64) error
142

143
        // Maintenance
144
        FlushIndex(req *schema.FlushIndexRequest) error
145
        CompactIndex() error
146

147
        IsClosed() bool
148
        Close() error
149

150
        DocumentDatabase
151
}
152

153
type replicaState struct {
154
        precommittedTxID uint64
155
        precommittedAlh  [sha256.Size]byte
156
}
157

158
// IDB database instance
159
type db struct {
160
        st *store.ImmuStore
161

162
        sqlEngine      *sql.Engine
163
        documentEngine *document.Engine
164

165
        mutex        *instrumentedRWMutex
166
        closingMutex sync.Mutex
167

168
        Logger  logger.Logger
169
        options *Options
170

171
        name string
172

173
        maxResultSize int
174

175
        txPool store.TxPool
176

177
        replicaStates      map[string]*replicaState
178
        replicaStatesMutex sync.Mutex
179
}
180

181
// OpenDB Opens an existing Database from disk
182
func OpenDB(
183
        dbName string,
184
        multidbHandler sql.MultiDBHandler,
185
        opts *Options,
186
        log logger.Logger,
187
) (DB, error) {
452✔
188
        if dbName == "" {
453✔
189
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
1✔
190
        }
1✔
191

192
        log.Infof("opening database '%s' {replica = %v}...", dbName, opts.replica)
451✔
193

451✔
194
        var replicaStates map[string]*replicaState
451✔
195
        // replica states are only managed in primary with synchronous replication
451✔
196
        if !opts.replica && opts.syncAcks > 0 {
463✔
197
                replicaStates = make(map[string]*replicaState, opts.syncAcks)
12✔
198
        }
12✔
199

200
        dbi := &db{
451✔
201
                Logger:        log,
451✔
202
                options:       opts,
451✔
203
                name:          dbName,
451✔
204
                replicaStates: replicaStates,
451✔
205
                maxResultSize: opts.maxResultSize,
451✔
206
                mutex:         &instrumentedRWMutex{},
451✔
207
        }
451✔
208

451✔
209
        dbDir := dbi.Path()
451✔
210
        _, err := os.Stat(dbDir)
451✔
211
        if os.IsNotExist(err) {
452✔
212
                return nil, fmt.Errorf("missing database directories: %s", dbDir)
1✔
213
        }
1✔
214

215
        stOpts := opts.GetStoreOptions().
450✔
216
                WithLogger(log).
450✔
217
                WithMultiIndexing(true).
450✔
218
                WithExternalCommitAllowance(opts.syncReplication)
450✔
219

450✔
220
        dbi.st, err = store.Open(dbDir, stOpts)
450✔
221
        if err != nil {
450✔
222
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
223
        }
×
224

225
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
1,350✔
226
                err := dbi.st.InitIndexing(&store.IndexSpec{
900✔
227
                        SourcePrefix: []byte{prefix},
900✔
228
                        TargetPrefix: []byte{prefix},
900✔
229
                })
900✔
230
                if err != nil {
900✔
231
                        dbi.st.Close()
×
232
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
233
                }
×
234
        }
235

236
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, opts.replica)
450✔
237

450✔
238
        sqlOpts := sql.DefaultOptions().
450✔
239
                WithPrefix([]byte{SQLPrefix}).
450✔
240
                WithMultiDBHandler(multidbHandler).
450✔
241
                WithParseTxMetadataFunc(parseTxMetadata)
450✔
242

450✔
243
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
450✔
244
        if err != nil {
450✔
245
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, opts.replica, err)
×
246
                return nil, err
×
247
        }
×
248

249
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
450✔
250

450✔
251
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
450✔
252
        if err != nil {
450✔
253
                return nil, err
×
254
        }
×
255
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
450✔
256

450✔
257
        txPool, err := dbi.st.NewTxHolderPool(opts.readTxPoolSize, false)
450✔
258
        if err != nil {
450✔
259
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
260
        }
×
261
        dbi.txPool = txPool
450✔
262

450✔
263
        if opts.replica {
487✔
264
                dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, opts.replica)
37✔
265
                return dbi, nil
37✔
266
        }
37✔
267

268
        dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, opts.replica)
413✔
269

413✔
270
        return dbi, nil
413✔
271
}
272

273
func parseTxMetadata(data []byte) (map[string]interface{}, error) {
1✔
274
        md := schema.Metadata{}
1✔
275
        if err := md.Unmarshal(data); err != nil {
1✔
276
                return nil, err
×
277
        }
×
278

279
        meta := make(map[string]interface{}, len(md))
1✔
280
        for k, v := range md {
3✔
281
                meta[k] = v
2✔
282
        }
2✔
283
        return meta, nil
1✔
284
}
285

286
func (d *db) Path() string {
451✔
287
        return filepath.Join(d.options.GetDBRootPath(), d.GetName())
451✔
288
}
451✔
289

290
func (d *db) allocTx() (*store.Tx, error) {
10,677✔
291
        tx, err := d.txPool.Alloc()
10,677✔
292
        if errors.Is(err, store.ErrTxPoolExhausted) {
10,677✔
293
                return nil, ErrTxReadPoolExhausted
×
294
        }
×
295
        return tx, err
10,677✔
296
}
297

298
func (d *db) releaseTx(tx *store.Tx) {
10,677✔
299
        d.txPool.Release(tx)
10,677✔
300
}
10,677✔
301

302
// NewDB Creates a new Database along with it's directories and files
303
func NewDB(dbName string, multidbHandler sql.MultiDBHandler, opts *Options, log logger.Logger) (DB, error) {
387✔
304
        if dbName == "" {
387✔
305
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
×
306
        }
×
307

308
        log.Infof("creating database '%s' {replica = %v}...", dbName, opts.replica)
387✔
309

387✔
310
        var replicaStates map[string]*replicaState
387✔
311
        // replica states are only managed in primary with synchronous replication
387✔
312
        if !opts.replica && opts.syncAcks > 0 {
387✔
313
                replicaStates = make(map[string]*replicaState, opts.syncAcks)
×
314
        }
×
315

316
        dbi := &db{
387✔
317
                Logger:        log,
387✔
318
                options:       opts,
387✔
319
                name:          dbName,
387✔
320
                replicaStates: replicaStates,
387✔
321
                maxResultSize: opts.maxResultSize,
387✔
322
                mutex:         &instrumentedRWMutex{},
387✔
323
        }
387✔
324

387✔
325
        dbDir := filepath.Join(opts.GetDBRootPath(), dbName)
387✔
326

387✔
327
        _, err := os.Stat(dbDir)
387✔
328
        if err == nil {
388✔
329
                return nil, fmt.Errorf("database directories already exist: %s", dbDir)
1✔
330
        }
1✔
331

332
        if err = os.MkdirAll(dbDir, os.ModePerm); err != nil {
387✔
333
                return nil, logErr(dbi.Logger, "unable to create data folder: %s", err)
1✔
334
        }
1✔
335

336
        stOpts := opts.GetStoreOptions().
385✔
337
                WithExternalCommitAllowance(opts.syncReplication).
385✔
338
                WithMultiIndexing(true).
385✔
339
                WithLogger(log)
385✔
340

385✔
341
        dbi.st, err = store.Open(dbDir, stOpts)
385✔
342
        if err != nil {
385✔
343
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
344
        }
×
345

346
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
1,155✔
347
                err := dbi.st.InitIndexing(&store.IndexSpec{
770✔
348
                        SourcePrefix: []byte{prefix},
770✔
349
                        TargetPrefix: []byte{prefix},
770✔
350
                })
770✔
351
                if err != nil {
770✔
352
                        dbi.st.Close()
×
353
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
354
                }
×
355
        }
356

357
        txPool, err := dbi.st.NewTxHolderPool(opts.readTxPoolSize, false)
385✔
358
        if err != nil {
385✔
359
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
360
        }
×
361
        dbi.txPool = txPool
385✔
362

385✔
363
        sqlOpts := sql.DefaultOptions().
385✔
364
                WithPrefix([]byte{SQLPrefix}).
385✔
365
                WithMultiDBHandler(multidbHandler).
385✔
366
                WithParseTxMetadataFunc(parseTxMetadata)
385✔
367

385✔
368
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, opts.replica)
385✔
369

385✔
370
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
385✔
371
        if err != nil {
385✔
372
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, opts.replica, err)
×
373
                return nil, err
×
374
        }
×
375
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
385✔
376

385✔
377
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
385✔
378
        if err != nil {
385✔
379
                return nil, logErr(dbi.Logger, "Unable to open database: %s", err)
×
380
        }
×
381
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
385✔
382

385✔
383
        dbi.Logger.Infof("database '%s' successfully created {replica = %v}", dbName, opts.replica)
385✔
384

385✔
385
        return dbi, nil
385✔
386
}
387

388
func (d *db) MaxResultSize() int {
5✔
389
        return d.maxResultSize
5✔
390
}
5✔
391

392
func (d *db) FlushIndex(req *schema.FlushIndexRequest) error {
6✔
393
        if req == nil {
7✔
394
                return store.ErrIllegalArguments
1✔
395
        }
1✔
396
        return d.st.FlushIndexes(req.CleanupPercentage, req.Synced)
5✔
397
}
398

399
// CompactIndex ...
400
func (d *db) CompactIndex() error {
4✔
401
        return d.st.CompactIndexes()
4✔
402
}
4✔
403

404
// Set ...
405
func (d *db) Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,463✔
406
        d.mutex.RLock()
4,463✔
407
        defer d.mutex.RUnlock()
4,463✔
408

4,463✔
409
        if d.isReplica() {
4,466✔
410
                return nil, ErrIsReplica
3✔
411
        }
3✔
412

413
        return d.set(ctx, req)
4,460✔
414
}
415

416
func (d *db) set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,460✔
417
        if req == nil {
4,461✔
418
                return nil, ErrIllegalArguments
1✔
419
        }
1✔
420

421
        tx, err := d.newWriteOnlyTx(ctx)
4,459✔
422
        if err != nil {
4,459✔
423
                return nil, err
×
424
        }
×
425
        defer tx.Cancel()
4,459✔
426

4,459✔
427
        // Skip the dedup map and per-key sha256 hash for the common single-KV
4,459✔
428
        // case; dedup is only meaningful when there's more than one KV.
4,459✔
429
        var keys map[[sha256.Size]byte]struct{}
4,459✔
430
        if len(req.KVs) > 1 {
4,480✔
431
                keys = make(map[[sha256.Size]byte]struct{}, len(req.KVs))
21✔
432
        }
21✔
433

434
        for _, kv := range req.KVs {
10,212✔
435
                if len(kv.Key) == 0 {
5,755✔
436
                        return nil, ErrIllegalArguments
2✔
437
                }
2✔
438

439
                if keys != nil {
7,069✔
440
                        kid := sha256.Sum256(kv.Key)
1,318✔
441
                        if _, ok := keys[kid]; ok {
1,319✔
442
                                return nil, schema.ErrDuplicatedKeysNotSupported
1✔
443
                        }
1✔
444
                        keys[kid] = struct{}{}
1,317✔
445
                }
446

447
                e := EncodeEntrySpec(
5,750✔
448
                        kv.Key,
5,750✔
449
                        schema.KVMetadataFromProto(kv.Metadata),
5,750✔
450
                        kv.Value,
5,750✔
451
                )
5,750✔
452

5,750✔
453
                err = tx.Set(e.Key, e.Metadata, e.Value)
5,750✔
454
                if err != nil {
5,751✔
455
                        return nil, err
1✔
456
                }
1✔
457
        }
458

459
        for i := range req.Preconditions {
5,526✔
460
                c, err := PreconditionFromProto(req.Preconditions[i])
1,071✔
461
                if err != nil {
1,072✔
462
                        return nil, err
1✔
463
                }
1✔
464

465
                err = tx.AddPrecondition(c)
1,070✔
466
                if err != nil {
1,071✔
467
                        return nil, fmt.Errorf("%w: %v", store.ErrInvalidPrecondition, err)
1✔
468
                }
1✔
469
        }
470

471
        var hdr *store.TxHeader
4,453✔
472

4,453✔
473
        if req.NoWait {
4,453✔
474
                hdr, err = tx.AsyncCommit(ctx)
×
475
        } else {
4,453✔
476
                hdr, err = tx.Commit(ctx)
4,453✔
477
        }
4,453✔
478
        if err != nil {
4,485✔
479
                return nil, err
32✔
480
        }
32✔
481

482
        return schema.TxHeaderToProto(hdr), nil
4,421✔
483
}
484

485
func (d *db) newWriteOnlyTx(ctx context.Context) (*store.OngoingTx, error) {
4,459✔
486
        tx, err := d.st.NewWriteOnlyTx(ctx)
4,459✔
487
        if err != nil {
4,459✔
488
                return nil, err
×
489
        }
×
490
        return d.txWithMetadata(ctx, tx)
4,459✔
491
}
492

493
func (d *db) newTx(ctx context.Context, opts *store.TxOptions) (*store.OngoingTx, error) {
8✔
494
        tx, err := d.st.NewTx(ctx, opts)
8✔
495
        if err != nil {
8✔
496
                return nil, err
×
497
        }
×
498
        return d.txWithMetadata(ctx, tx)
8✔
499
}
500

501
func (d *db) txWithMetadata(ctx context.Context, tx *store.OngoingTx) (*store.OngoingTx, error) {
4,467✔
502
        meta := schema.MetadataFromContext(ctx)
4,467✔
503
        if len(meta) > 0 {
4,529✔
504
                txmd := store.NewTxMetadata()
62✔
505

62✔
506
                data, err := meta.Marshal()
62✔
507
                if err != nil {
62✔
508
                        return nil, err
×
509
                }
×
510

511
                if err := txmd.WithExtra(data); err != nil {
62✔
512
                        return nil, err
×
513
                }
×
514
                return tx.WithMetadata(txmd), nil
62✔
515
        }
516
        return tx, nil
4,405✔
517
}
518

519
func checkKeyRequest(req *schema.KeyRequest) error {
3,191✔
520
        if req == nil {
3,192✔
521
                return fmt.Errorf(
1✔
522
                        "%w: empty request",
1✔
523
                        ErrIllegalArguments,
1✔
524
                )
1✔
525
        }
1✔
526

527
        if len(req.Key) == 0 {
3,192✔
528
                return fmt.Errorf(
2✔
529
                        "%w: empty key",
2✔
530
                        ErrIllegalArguments,
2✔
531
                )
2✔
532
        }
2✔
533

534
        if req.AtTx > 0 {
3,213✔
535
                if req.SinceTx > 0 {
27✔
536
                        return fmt.Errorf(
2✔
537
                                "%w: SinceTx should not be specified when AtTx is used",
2✔
538
                                ErrIllegalArguments,
2✔
539
                        )
2✔
540
                }
2✔
541

542
                if req.AtRevision != 0 {
24✔
543
                        return fmt.Errorf(
1✔
544
                                "%w: AtRevision should not be specified when AtTx is used",
1✔
545
                                ErrIllegalArguments,
1✔
546
                        )
1✔
547
                }
1✔
548
        }
549

550
        return nil
3,185✔
551
}
552

553
// Get ...
554
func (d *db) Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error) {
3,187✔
555
        err := checkKeyRequest(req)
3,187✔
556
        if err != nil {
3,189✔
557
                return nil, err
2✔
558
        }
2✔
559

560
        currTxID, _ := d.st.CommittedAlh()
3,185✔
561
        if req.SinceTx > currTxID {
3,186✔
562
                return nil, fmt.Errorf(
1✔
563
                        "%w: SinceTx must not be greater than the current transaction ID",
1✔
564
                        ErrIllegalArguments,
1✔
565
                )
1✔
566
        }
1✔
567

568
        if !req.NoWait && req.AtTx == 0 {
6,346✔
569
                waitUntilTx := req.SinceTx
3,162✔
570
                if waitUntilTx == 0 {
5,227✔
571
                        waitUntilTx = currTxID
2,065✔
572
                }
2,065✔
573

574
                err := d.WaitForIndexingUpto(ctx, waitUntilTx)
3,162✔
575
                if err != nil {
3,164✔
576
                        return nil, err
2✔
577
                }
2✔
578
        }
579

580
        if req.AtRevision != 0 {
3,213✔
581
                return d.getAtRevision(ctx, EncodeKey(req.Key), req.AtRevision, true)
31✔
582
        }
31✔
583

584
        return d.getAtTx(ctx, EncodeKey(req.Key), req.AtTx, 0, d.st, 0, true)
3,151✔
585
}
586

587
func (d *db) get(ctx context.Context, key []byte, index store.KeyIndex, skipIntegrityCheck bool) (*schema.Entry, error) {
10✔
588
        return d.getAtTx(ctx, key, 0, 0, index, 0, skipIntegrityCheck)
10✔
589
}
10✔
590

591
func (d *db) getAtTx(
592
        ctx context.Context,
593
        key []byte,
594
        atTx uint64,
595
        resolved int,
596
        index store.KeyIndex,
597
        revision uint64,
598
        skipIntegrityCheck bool,
599
) (entry *schema.Entry, err error) {
7,083✔
600

7,083✔
601
        var txID uint64
7,083✔
602
        var val []byte
7,083✔
603
        var md *store.KVMetadata
7,083✔
604

7,083✔
605
        if atTx == 0 {
10,890✔
606
                valRef, err := index.Get(ctx, key)
3,807✔
607
                if err != nil {
3,891✔
608
                        return nil, err
84✔
609
                }
84✔
610

611
                txID = valRef.Tx()
3,723✔
612
                revision = valRef.HC()
3,723✔
613
                md = valRef.KVMetadata()
3,723✔
614

3,723✔
615
                val, err = valRef.Resolve()
3,723✔
616
                if err != nil {
3,727✔
617
                        return nil, err
4✔
618
                }
4✔
619
        } else {
3,276✔
620
                txID = atTx
3,276✔
621

3,276✔
622
                md, val, err = d.readMetadataAndValue(key, atTx, skipIntegrityCheck)
3,276✔
623
                if err != nil {
3,371✔
624
                        return nil, err
95✔
625
                }
95✔
626
        }
627

628
        return d.resolveValue(ctx, key, val, resolved, txID, md, index, revision, skipIntegrityCheck)
6,900✔
629
}
630

631
func (d *db) readMetadataAndValue(key []byte, atTx uint64, skipIntegrityCheck bool) (*store.KVMetadata, []byte, error) {
3,276✔
632
        entry, _, err := d.st.ReadTxEntry(atTx, key, skipIntegrityCheck)
3,276✔
633
        if err != nil {
3,362✔
634
                return nil, nil, err
86✔
635
        }
86✔
636

637
        v, err := d.st.ReadValue(entry)
3,190✔
638
        if err != nil {
3,199✔
639
                return nil, nil, err
9✔
640
        }
9✔
641

642
        return entry.Metadata(), v, nil
3,181✔
643
}
644

645
func (d *db) getAtRevision(ctx context.Context, key []byte, atRevision int64, skipIntegrityCheck bool) (entry *schema.Entry, err error) {
31✔
646
        var offset uint64
31✔
647
        var desc bool
31✔
648

31✔
649
        if atRevision > 0 {
47✔
650
                offset = uint64(atRevision) - 1
16✔
651
                desc = false
16✔
652
        } else {
31✔
653
                offset = -uint64(atRevision)
15✔
654
                desc = true
15✔
655
        }
15✔
656

657
        valRefs, hCount, err := d.st.History(key, offset, desc, 1)
31✔
658
        if errors.Is(err, store.ErrNoMoreEntries) || errors.Is(err, store.ErrOffsetOutOfRange) {
35✔
659
                return nil, ErrInvalidRevision
4✔
660
        }
4✔
661
        if err != nil {
28✔
662
                return nil, err
1✔
663
        }
1✔
664

665
        if atRevision < 0 {
39✔
666
                atRevision = int64(hCount) + atRevision
13✔
667
        }
13✔
668

669
        entry, err = d.getAtTx(ctx, key, valRefs[0].Tx(), 0, d.st, uint64(atRevision), skipIntegrityCheck)
26✔
670
        if err != nil {
27✔
671
                return nil, err
1✔
672
        }
1✔
673

674
        return entry, err
25✔
675
}
676

677
func (d *db) resolveValue(
678
        ctx context.Context,
679
        key []byte,
680
        val []byte,
681
        resolved int,
682
        txID uint64,
683
        md *store.KVMetadata,
684
        index store.KeyIndex,
685
        revision uint64,
686
        skipIntegrityCheck bool,
687
) (entry *schema.Entry, err error) {
6,909✔
688
        if md != nil && md.Deleted() {
6,911✔
689
                return nil, store.ErrKeyNotFound
2✔
690
        }
2✔
691

692
        if len(val) < 1 {
6,907✔
693
                return nil, fmt.Errorf("%w: internal value consistency error - missing value prefix", store.ErrCorruptedData)
×
694
        }
×
695

696
        // Reference lookup
697
        if val[0] == ReferenceValuePrefix {
7,119✔
698
                if len(val) < 1+8 {
212✔
699
                        return nil, fmt.Errorf("%w: internal value consistency error - invalid reference", store.ErrCorruptedData)
×
700
                }
×
701

702
                if resolved == MaxKeyResolutionLimit {
214✔
703
                        return nil, ErrKeyResolutionLimitReached
2✔
704
                }
2✔
705

706
                atTx := binary.BigEndian.Uint64(TrimPrefix(val))
210✔
707
                refKey := make([]byte, len(val)-1-8)
210✔
708
                copy(refKey, val[1+8:])
210✔
709

210✔
710
                if index != nil {
419✔
711
                        entry, err = d.getAtTx(ctx, refKey, atTx, resolved+1, index, 0, skipIntegrityCheck)
209✔
712
                        if err != nil {
210✔
713
                                return nil, err
1✔
714
                        }
1✔
715
                } else {
1✔
716
                        entry = &schema.Entry{
1✔
717
                                Key: TrimPrefix(refKey),
1✔
718
                                Tx:  atTx,
1✔
719
                        }
1✔
720
                }
1✔
721

722
                entry.ReferencedBy = &schema.Reference{
209✔
723
                        Tx:       txID,
209✔
724
                        Key:      TrimPrefix(key),
209✔
725
                        Metadata: schema.KVMetadataToProto(md),
209✔
726
                        AtTx:     atTx,
209✔
727
                        Revision: revision,
209✔
728
                }
209✔
729

209✔
730
                return entry, nil
209✔
731
        }
732

733
        return &schema.Entry{
6,695✔
734
                Tx:       txID,
6,695✔
735
                Key:      TrimPrefix(key),
6,695✔
736
                Metadata: schema.KVMetadataToProto(md),
6,695✔
737
                Value:    TrimPrefix(val),
6,695✔
738
                Revision: revision,
6,695✔
739
        }, nil
6,695✔
740
}
741

742
func (d *db) Health() (waitingCount int, lastReleaseAt time.Time) {
1✔
743
        return d.mutex.State()
1✔
744
}
1✔
745

746
// CurrentState ...
747
func (d *db) CurrentState() (*schema.ImmutableState, error) {
25,239✔
748
        lastTxID, lastTxAlh := d.st.CommittedAlh()
25,239✔
749
        lastPreTxID, lastPreTxAlh := d.st.PrecommittedAlh()
25,239✔
750

25,239✔
751
        return &schema.ImmutableState{
25,239✔
752
                TxId:               lastTxID,
25,239✔
753
                TxHash:             lastTxAlh[:],
25,239✔
754
                PrecommittedTxId:   lastPreTxID,
25,239✔
755
                PrecommittedTxHash: lastPreTxAlh[:],
25,239✔
756
        }, nil
25,239✔
757
}
25,239✔
758

759
// WaitForTx blocks caller until specified tx
760
func (d *db) WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error {
8,156✔
761
        return d.st.WaitForTx(ctx, txID, allowPrecommitted)
8,156✔
762
}
8,156✔
763

764
// WaitForIndexingUpto blocks caller until specified tx gets indexed
765
func (d *db) WaitForIndexingUpto(ctx context.Context, txID uint64) error {
3,528✔
766
        return d.st.WaitForIndexingUpto(ctx, txID)
3,528✔
767
}
3,528✔
768

769
// VerifiableSet ...
770
func (d *db) VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error) {
49✔
771
        if req == nil {
50✔
772
                return nil, ErrIllegalArguments
1✔
773
        }
1✔
774

775
        lastTxID, _ := d.st.CommittedAlh()
48✔
776
        if lastTxID < req.ProveSinceTx {
49✔
777
                return nil, ErrIllegalState
1✔
778
        }
1✔
779

780
        // Preallocate tx buffers
781
        lastTx, err := d.allocTx()
47✔
782
        if err != nil {
47✔
783
                return nil, err
×
784
        }
×
785
        defer d.releaseTx(lastTx)
47✔
786

47✔
787
        txhdr, err := d.Set(ctx, req.SetRequest)
47✔
788
        if err != nil {
48✔
789
                return nil, err
1✔
790
        }
1✔
791

792
        err = d.st.ReadTx(uint64(txhdr.Id), false, lastTx)
46✔
793
        if err != nil {
46✔
794
                return nil, err
×
795
        }
×
796

797
        var prevTxHdr *store.TxHeader
46✔
798

46✔
799
        if req.ProveSinceTx == 0 {
66✔
800
                prevTxHdr = lastTx.Header()
20✔
801
        } else {
46✔
802
                prevTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
26✔
803
                if err != nil {
26✔
804
                        return nil, err
×
805
                }
×
806
        }
807

808
        dualProof, err := d.st.DualProof(prevTxHdr, lastTx.Header())
46✔
809
        if err != nil {
46✔
810
                return nil, err
×
811
        }
×
812

813
        return &schema.VerifiableTx{
46✔
814
                Tx:        schema.TxToProto(lastTx),
46✔
815
                DualProof: schema.DualProofToProto(dualProof),
46✔
816
        }, nil
46✔
817
}
818

819
// VerifiableGet ...
820
func (d *db) VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error) {
1,038✔
821
        if req == nil {
1,039✔
822
                return nil, ErrIllegalArguments
1✔
823
        }
1✔
824

825
        lastTxID, _ := d.st.CommittedAlh()
1,037✔
826
        if lastTxID < req.ProveSinceTx {
1,038✔
827
                return nil, ErrIllegalState
1✔
828
        }
1✔
829

830
        e, err := d.Get(ctx, req.KeyRequest)
1,036✔
831
        if err != nil {
1,040✔
832
                return nil, err
4✔
833
        }
4✔
834

835
        var vTxID uint64
1,032✔
836
        var vKey []byte
1,032✔
837

1,032✔
838
        if e.ReferencedBy == nil {
2,059✔
839
                vTxID = e.Tx
1,027✔
840
                vKey = e.Key
1,027✔
841
        } else {
1,032✔
842
                vTxID = e.ReferencedBy.Tx
5✔
843
                vKey = e.ReferencedBy.Key
5✔
844
        }
5✔
845

846
        // key-value inclusion proof
847
        tx, err := d.allocTx()
1,032✔
848
        if err != nil {
1,032✔
849
                return nil, err
×
850
        }
×
851
        defer d.releaseTx(tx)
1,032✔
852

1,032✔
853
        err = d.st.ReadTx(vTxID, false, tx)
1,032✔
854
        if err != nil {
1,032✔
855
                return nil, err
×
856
        }
×
857

858
        var rootTxHdr *store.TxHeader
1,032✔
859

1,032✔
860
        if req.ProveSinceTx == 0 {
2,043✔
861
                rootTxHdr = tx.Header()
1,011✔
862
        } else {
1,032✔
863
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
21✔
864
                if err != nil {
21✔
865
                        return nil, err
×
866
                }
×
867
        }
868

869
        inclusionProof, err := tx.Proof(EncodeKey(vKey))
1,032✔
870
        if err != nil {
1,032✔
871
                return nil, err
×
872
        }
×
873

874
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,032✔
875

1,032✔
876
        if req.ProveSinceTx <= vTxID {
2,058✔
877
                sourceTxHdr = rootTxHdr
1,026✔
878
                targetTxHdr = tx.Header()
1,026✔
879
        } else {
1,032✔
880
                sourceTxHdr = tx.Header()
6✔
881
                targetTxHdr = rootTxHdr
6✔
882
        }
6✔
883

884
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,032✔
885
        if err != nil {
1,032✔
886
                return nil, err
×
887
        }
×
888

889
        verifiableTx := &schema.VerifiableTx{
1,032✔
890
                Tx:        schema.TxToProto(tx),
1,032✔
891
                DualProof: schema.DualProofToProto(dualProof),
1,032✔
892
        }
1,032✔
893

1,032✔
894
        return &schema.VerifiableEntry{
1,032✔
895
                Entry:          e,
1,032✔
896
                VerifiableTx:   verifiableTx,
1,032✔
897
                InclusionProof: schema.InclusionProofToProto(inclusionProof),
1,032✔
898
        }, nil
1,032✔
899
}
900

901
func (d *db) Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error) {
10✔
902
        if req == nil {
11✔
903
                return nil, ErrIllegalArguments
1✔
904
        }
1✔
905

906
        d.mutex.RLock()
9✔
907
        defer d.mutex.RUnlock()
9✔
908

9✔
909
        if d.isReplica() {
9✔
910
                return nil, ErrIsReplica
×
911
        }
×
912

913
        opts := store.DefaultTxOptions()
9✔
914

9✔
915
        if req.SinceTx > 0 {
10✔
916
                if req.SinceTx > d.st.LastPrecommittedTxID() {
2✔
917
                        return nil, store.ErrTxNotFound
1✔
918
                }
1✔
919

920
                opts.WithSnapshotMustIncludeTxID(func(_ uint64) uint64 {
×
921
                        return req.SinceTx
×
922
                })
×
923
        }
924

925
        tx, err := d.newTx(ctx, opts)
8✔
926
        if err != nil {
8✔
927
                return nil, err
×
928
        }
×
929
        defer tx.Cancel()
8✔
930

8✔
931
        for _, k := range req.Keys {
14✔
932
                if len(k) == 0 {
6✔
933
                        return nil, ErrIllegalArguments
×
934
                }
×
935

936
                md := store.NewKVMetadata()
6✔
937

6✔
938
                md.AsDeleted(true)
6✔
939

6✔
940
                e := EncodeEntrySpec(k, md, nil)
6✔
941

6✔
942
                err = tx.Delete(ctx, e.Key)
6✔
943
                if err != nil {
7✔
944
                        return nil, err
1✔
945
                }
1✔
946
        }
947

948
        var hdr *store.TxHeader
7✔
949
        if req.NoWait {
7✔
950
                hdr, err = tx.AsyncCommit(ctx)
×
951
        } else {
7✔
952
                hdr, err = tx.Commit(ctx)
7✔
953
        }
7✔
954
        if err != nil {
9✔
955
                return nil, err
2✔
956
        }
2✔
957

958
        return schema.TxHeaderToProto(hdr), nil
5✔
959
}
960

961
// GetAll ...
962
func (d *db) GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error) {
4✔
963
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
4✔
964
        if err != nil {
4✔
965
                return nil, err
×
966
        }
×
967
        defer snap.Close()
4✔
968

4✔
969
        list := &schema.Entries{}
4✔
970

4✔
971
        for _, key := range req.Keys {
14✔
972
                e, err := d.get(ctx, EncodeKey(key), snap, true)
10✔
973
                if err == nil || errors.Is(err, store.ErrKeyNotFound) {
20✔
974
                        if e != nil {
19✔
975
                                list.Entries = append(list.Entries, e)
9✔
976
                        }
9✔
977
                } else {
×
978
                        return nil, err
×
979
                }
×
980
        }
981

982
        return list, nil
4✔
983
}
984

985
func (d *db) Size() (uint64, error) {
2✔
986
        return d.st.Size()
2✔
987
}
2✔
988

989
// TxCount ...
990
func (d *db) TxCount() (uint64, error) {
302✔
991
        return d.st.TxCount(), nil
302✔
992
}
302✔
993

994
// Count ...
995
func (d *db) Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error) {
8✔
996
        if prefix == nil {
10✔
997
                return nil, ErrIllegalArguments
2✔
998
        }
2✔
999

1000
        tx, err := d.st.NewTx(ctx, store.DefaultTxOptions().WithMode(store.ReadOnlyTx))
6✔
1001
        if err != nil {
6✔
1002
                return nil, err
×
1003
        }
×
1004
        defer tx.Cancel()
6✔
1005

6✔
1006
        keyReader, err := tx.NewKeyReader(store.KeyReaderSpec{
6✔
1007
                Prefix: WrapWithPrefix(prefix.Prefix, SetKeyPrefix),
6✔
1008
        })
6✔
1009
        if err != nil {
6✔
1010
                return nil, err
×
1011
        }
×
1012
        defer keyReader.Close()
6✔
1013

6✔
1014
        count := 0
6✔
1015

6✔
1016
        for {
19✔
1017
                _, _, err := keyReader.Read(ctx)
13✔
1018
                if errors.Is(err, store.ErrNoMoreEntries) {
19✔
1019
                        break
6✔
1020
                }
1021
                if err != nil {
7✔
1022
                        return nil, err
×
1023
                }
×
1024

1025
                count++
7✔
1026
        }
1027

1028
        return &schema.EntryCount{Count: uint64(count)}, nil
6✔
1029
}
1030

1031
// CountAll ...
1032
func (d *db) CountAll(ctx context.Context) (*schema.EntryCount, error) {
4✔
1033
        return d.Count(ctx, &schema.KeyPrefix{})
4✔
1034
}
4✔
1035

1036
// TxByID ...
1037
func (d *db) TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error) {
62✔
1038
        if req == nil {
63✔
1039
                return nil, ErrIllegalArguments
1✔
1040
        }
1✔
1041

1042
        var snap *store.Snapshot
61✔
1043
        var err error
61✔
1044

61✔
1045
        tx, err := d.allocTx()
61✔
1046
        if err != nil {
61✔
1047
                return nil, err
×
1048
        }
×
1049
        defer d.releaseTx(tx)
61✔
1050

61✔
1051
        if !req.KeepReferencesUnresolved {
120✔
1052
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
59✔
1053
                if err != nil {
59✔
1054
                        return nil, err
×
1055
                }
×
1056
                defer snap.Close()
59✔
1057
        }
1058

1059
        // key-value inclusion proof
1060
        err = d.st.ReadTx(req.Tx, false, tx)
61✔
1061
        if err != nil {
61✔
1062
                return nil, err
×
1063
        }
×
1064

1065
        return d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
61✔
1066
}
1067

1068
func (d *db) snapshotSince(ctx context.Context, prefix []byte, txID uint64) (*store.Snapshot, error) {
1,839✔
1069
        currTxID, _ := d.st.CommittedAlh()
1,839✔
1070

1,839✔
1071
        if txID > currTxID {
1,839✔
1072
                return nil, ErrIllegalArguments
×
1073
        }
×
1074

1075
        waitUntilTx := txID
1,839✔
1076
        if waitUntilTx == 0 {
3,612✔
1077
                waitUntilTx = currTxID
1,773✔
1078
        }
1,773✔
1079

1080
        return d.st.SnapshotMustIncludeTxID(ctx, prefix, waitUntilTx)
1,839✔
1081
}
1082

1083
func (d *db) serializeTx(ctx context.Context, tx *store.Tx, spec *schema.EntriesSpec, snap *store.Snapshot, skipIntegrityCheck bool) (*schema.Tx, error) {
1,377✔
1084
        if spec == nil {
2,244✔
1085
                return schema.TxToProto(tx), nil
867✔
1086
        }
867✔
1087

1088
        stx := &schema.Tx{
510✔
1089
                Header: schema.TxHeaderToProto(tx.Header()),
510✔
1090
        }
510✔
1091

510✔
1092
        for _, e := range tx.Entries() {
1,048✔
1093
                switch e.Key()[0] {
538✔
1094
                case SetKeyPrefix:
39✔
1095
                        {
78✔
1096
                                if spec.KvEntriesSpec == nil || spec.KvEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
57✔
1097
                                        break
18✔
1098
                                }
1099

1100
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
24✔
1101
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
3✔
1102
                                        break
3✔
1103
                                }
1104

1105
                                v, err := d.st.ReadValue(e)
18✔
1106
                                if errors.Is(err, store.ErrExpiredEntry) {
18✔
1107
                                        break
×
1108
                                }
1109
                                if err != nil {
18✔
1110
                                        return nil, err
×
1111
                                }
×
1112

1113
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
27✔
1114
                                        kve := schema.TxEntryToProto(e)
9✔
1115
                                        kve.Value = v
9✔
1116
                                        stx.Entries = append(stx.Entries, kve)
9✔
1117
                                        break
9✔
1118
                                }
1119

1120
                                // resolve entry
1121
                                var index store.KeyIndex
9✔
1122
                                if snap != nil {
16✔
1123
                                        index = snap
7✔
1124
                                }
7✔
1125

1126
                                kve, err := d.resolveValue(ctx, e.Key(), v, 0, tx.Header().ID, e.Metadata(), index, 0, skipIntegrityCheck)
9✔
1127
                                if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
10✔
1128
                                        break // ignore deleted ones (referenced key may have been deleted)
1✔
1129
                                }
1130
                                if err != nil {
8✔
1131
                                        return nil, err
×
1132
                                }
×
1133

1134
                                stx.KvEntries = append(stx.KvEntries, kve)
8✔
1135
                        }
1136
                case SortedSetKeyPrefix:
14✔
1137
                        {
28✔
1138
                                if spec.ZEntriesSpec == nil || spec.ZEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
22✔
1139
                                        break
8✔
1140
                                }
1141

1142
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
7✔
1143
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1144
                                        break
1✔
1145
                                }
1146

1147
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
6✔
1148
                                        v, err := d.st.ReadValue(e)
1✔
1149
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1150
                                                break
×
1151
                                        }
1152
                                        if err != nil {
1✔
1153
                                                return nil, err
×
1154
                                        }
×
1155

1156
                                        kve := schema.TxEntryToProto(e)
1✔
1157
                                        kve.Value = v
1✔
1158
                                        stx.Entries = append(stx.Entries, kve)
1✔
1159
                                        break
1✔
1160
                                }
1161

1162
                                // zKey = [1+setLenLen+set+scoreLen+keyLenLen+1+key+txIDLen]
1163
                                zKey := e.Key()
4✔
1164

4✔
1165
                                setLen := int(binary.BigEndian.Uint64(zKey[1:]))
4✔
1166
                                set := make([]byte, setLen)
4✔
1167
                                copy(set, zKey[1+setLenLen:])
4✔
1168

4✔
1169
                                scoreOff := 1 + setLenLen + setLen
4✔
1170
                                scoreB := binary.BigEndian.Uint64(zKey[scoreOff:])
4✔
1171
                                score := math.Float64frombits(scoreB)
4✔
1172

4✔
1173
                                keyOff := scoreOff + scoreLen + keyLenLen
4✔
1174
                                key := make([]byte, len(zKey)-keyOff-txIDLen)
4✔
1175
                                copy(key, zKey[keyOff:])
4✔
1176

4✔
1177
                                atTx := binary.BigEndian.Uint64(zKey[keyOff+len(key):])
4✔
1178

4✔
1179
                                var entry *schema.Entry
4✔
1180
                                var err error
4✔
1181

4✔
1182
                                if snap != nil {
7✔
1183
                                        entry, err = d.getAtTx(ctx, key, atTx, 1, snap, 0, skipIntegrityCheck)
3✔
1184
                                        if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
3✔
1185
                                                break // ignore deleted ones (referenced key may have been deleted)
×
1186
                                        }
1187
                                        if err != nil {
3✔
1188
                                                return nil, err
×
1189
                                        }
×
1190
                                }
1191

1192
                                zentry := &schema.ZEntry{
4✔
1193
                                        Set:   set,
4✔
1194
                                        Key:   key[1:],
4✔
1195
                                        Entry: entry,
4✔
1196
                                        Score: score,
4✔
1197
                                        AtTx:  atTx,
4✔
1198
                                }
4✔
1199

4✔
1200
                                stx.ZEntries = append(stx.ZEntries, zentry)
4✔
1201
                        }
1202
                case SQLPrefix:
5✔
1203
                        {
10✔
1204
                                if spec.SqlEntriesSpec == nil || spec.SqlEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
7✔
1205
                                        break
2✔
1206
                                }
1207

1208
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
4✔
1209
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1210
                                        break
1✔
1211
                                }
1212

1213
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
3✔
1214
                                        v, err := d.st.ReadValue(e)
1✔
1215
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1216
                                                break
×
1217
                                        }
1218
                                        if err != nil {
1✔
1219
                                                return nil, err
×
1220
                                        }
×
1221

1222
                                        kve := schema.TxEntryToProto(e)
1✔
1223
                                        kve.Value = v
1✔
1224
                                        stx.Entries = append(stx.Entries, kve)
1✔
1225
                                        break
1✔
1226
                                }
1227

1228
                                return nil, fmt.Errorf("%w: sql entry resolution is not supported", ErrIllegalArguments)
1✔
1229
                        }
1230
                }
1231
        }
1232

1233
        return stx, nil
509✔
1234
}
1235

1236
func (d *db) mayUpdateReplicaState(committedTxID uint64, newReplicaState *schema.ReplicaState) error {
6,514✔
1237
        d.replicaStatesMutex.Lock()
6,514✔
1238
        defer d.replicaStatesMutex.Unlock()
6,514✔
1239

6,514✔
1240
        // clean up replicaStates
6,514✔
1241
        // it's safe to remove up to latest tx committed in primary
6,514✔
1242
        for uuid, st := range d.replicaStates {
6,949✔
1243
                if st.precommittedTxID <= committedTxID {
739✔
1244
                        delete(d.replicaStates, uuid)
304✔
1245
                }
304✔
1246
        }
1247

1248
        if newReplicaState.PrecommittedTxID <= committedTxID {
12,679✔
1249
                // as far as the primary is concerned, nothing really new has happened
6,165✔
1250
                return nil
6,165✔
1251
        }
6,165✔
1252

1253
        newReplicaAlh := schema.DigestFromProto(newReplicaState.PrecommittedAlh)
349✔
1254

349✔
1255
        replicaSt, ok := d.replicaStates[newReplicaState.UUID]
349✔
1256
        if ok {
388✔
1257
                if newReplicaState.PrecommittedTxID < replicaSt.precommittedTxID {
39✔
1258
                        return fmt.Errorf("%w: the newly informed replica state lags behind the previously informed one", ErrIllegalArguments)
×
1259
                }
×
1260

1261
                if newReplicaState.PrecommittedTxID == replicaSt.precommittedTxID {
77✔
1262
                        // as of the last informed replica status update, nothing has changed
38✔
1263
                        return nil
38✔
1264
                }
38✔
1265

1266
                // actual replication progress is informed by the replica
1267
                replicaSt.precommittedTxID = newReplicaState.PrecommittedTxID
1✔
1268
                replicaSt.precommittedAlh = newReplicaAlh
1✔
1269
        } else {
310✔
1270
                // replica informs first replication state
310✔
1271
                d.replicaStates[newReplicaState.UUID] = &replicaState{
310✔
1272
                        precommittedTxID: newReplicaState.PrecommittedTxID,
310✔
1273
                        precommittedAlh:  newReplicaAlh,
310✔
1274
                }
310✔
1275
        }
310✔
1276

1277
        // check up to which tx enough replicas ack replication and it's safe to commit
1278
        mayCommitUpToTxID := uint64(0)
311✔
1279
        if len(d.replicaStates) > 0 {
622✔
1280
                mayCommitUpToTxID = math.MaxUint64
311✔
1281
        }
311✔
1282

1283
        allowances := 0
311✔
1284

311✔
1285
        // we may clean up replicaStates from those who are lagging behind commit
311✔
1286
        for _, st := range d.replicaStates {
701✔
1287
                if st.precommittedTxID < mayCommitUpToTxID {
701✔
1288
                        mayCommitUpToTxID = st.precommittedTxID
311✔
1289
                }
311✔
1290
                allowances++
390✔
1291
        }
1292

1293
        if allowances >= d.options.syncAcks {
591✔
1294
                err := d.st.AllowCommitUpto(mayCommitUpToTxID)
280✔
1295
                if err != nil {
280✔
1296
                        return err
×
1297
                }
×
1298
        }
1299

1300
        return nil
311✔
1301
}
1302

1303
func (d *db) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error) {
8,157✔
1304
        if req == nil {
8,157✔
1305
                return nil, 0, mayCommitUpToAlh, ErrIllegalArguments
×
1306
        }
×
1307

1308
        if d.replicaStates == nil && req.ReplicaState != nil {
8,157✔
1309
                return nil, 0, mayCommitUpToAlh, fmt.Errorf("%w: replica state was NOT expected", ErrIllegalState)
×
1310
        }
×
1311

1312
        tx, err := d.allocTx()
8,157✔
1313
        if err != nil {
8,157✔
1314
                return nil, 0, mayCommitUpToAlh, err
×
1315
        }
×
1316
        defer d.releaseTx(tx)
8,157✔
1317

8,157✔
1318
        committedTxID, committedAlh := d.st.CommittedAlh()
8,157✔
1319
        preCommittedTxID, _ := d.st.PrecommittedAlh()
8,157✔
1320

8,157✔
1321
        if req.ReplicaState != nil {
14,673✔
1322
                if req.ReplicaState.CommittedTxID > 0 {
12,512✔
1323
                        // validate replica commit state
5,996✔
1324
                        if req.ReplicaState.CommittedTxID > committedTxID {
5,996✔
1325
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1326
                        }
×
1327

1328
                        // integrityCheck is currently required to validate Alh
1329
                        expectedReplicaCommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.CommittedTxID, false, false)
5,996✔
1330
                        if err != nil {
5,996✔
1331
                                return nil, committedTxID, committedAlh, err
×
1332
                        }
×
1333

1334
                        replicaCommittedAlh := schema.DigestFromProto(req.ReplicaState.CommittedAlh)
5,996✔
1335

5,996✔
1336
                        if expectedReplicaCommitHdr.Alh() != replicaCommittedAlh {
5,996✔
1337
                                return nil, expectedReplicaCommitHdr.ID, expectedReplicaCommitHdr.Alh(), fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1338
                        }
×
1339
                }
1340

1341
                if req.ReplicaState.PrecommittedTxID > 0 {
12,719✔
1342
                        // validate replica precommit state
6,203✔
1343
                        if req.ReplicaState.PrecommittedTxID > preCommittedTxID {
6,205✔
1344
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
2✔
1345
                        }
2✔
1346

1347
                        // integrityCheck is currently required to validate Alh
1348
                        expectedReplicaPrecommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.PrecommittedTxID, true, false)
6,201✔
1349
                        if err != nil {
6,201✔
1350
                                return nil, committedTxID, committedAlh, err
×
1351
                        }
×
1352

1353
                        replicaPreCommittedAlh := schema.DigestFromProto(req.ReplicaState.PrecommittedAlh)
6,201✔
1354

6,201✔
1355
                        if expectedReplicaPrecommitHdr.Alh() != replicaPreCommittedAlh {
6,201✔
1356
                                return nil, expectedReplicaPrecommitHdr.ID, expectedReplicaPrecommitHdr.Alh(), fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1357
                        }
×
1358

1359
                        // primary will provide commit state to the replica so it can commit pre-committed transactions
1360
                        if req.ReplicaState.PrecommittedTxID < committedTxID {
7,550✔
1361
                                // if replica is behind current commit state in primary
1,349✔
1362
                                // return the alh up to the point known by the replica.
1,349✔
1363
                                // That way the replica is able to validate is following the right primary.
1,349✔
1364
                                mayCommitUpToTxID = req.ReplicaState.PrecommittedTxID
1,349✔
1365
                                mayCommitUpToAlh = replicaPreCommittedAlh
1,349✔
1366
                        } else {
6,201✔
1367
                                mayCommitUpToTxID = committedTxID
4,852✔
1368
                                mayCommitUpToAlh = committedAlh
4,852✔
1369
                        }
4,852✔
1370
                }
1371

1372
                err = d.mayUpdateReplicaState(committedTxID, req.ReplicaState)
6,514✔
1373
                if err != nil {
6,514✔
1374
                        return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1375
                }
×
1376
        }
1377

1378
        // it might be the case primary will commit some txs (even there could be inmem-precommitted txs)
1379
        // current timeout it's not a special value but at least a relative one
1380
        // note: primary might also be waiting ack from any replica (even this primary may do progress)
1381

1382
        // TODO: under some circumstances, replica might not be able to do further progress until primary
1383
        // has made changes, such wait doesn't need to have a timeout, reducing networking and CPU utilization
1384
        var cancel context.CancelFunc
8,155✔
1385

8,155✔
1386
        if req.ReplicaState != nil {
14,669✔
1387
                ctx, cancel = context.WithTimeout(ctx, d.options.storeOpts.SyncFrequency*4)
6,514✔
1388
                defer cancel()
6,514✔
1389
        }
6,514✔
1390

1391
        err = d.WaitForTx(ctx, req.Tx, req.AllowPreCommitted)
8,155✔
1392
        if ctx.Err() != nil {
8,800✔
1393
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, nil
645✔
1394
        }
645✔
1395
        if err != nil {
7,510✔
1396
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1397
        }
×
1398

1399
        txbs, err = d.st.ExportTx(req.Tx, req.AllowPreCommitted, req.SkipIntegrityCheck, tx)
7,510✔
1400
        if err != nil {
7,510✔
1401
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1402
        }
×
1403

1404
        return txbs, mayCommitUpToTxID, mayCommitUpToAlh, nil
7,510✔
1405
}
1406

1407
func (d *db) ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error) {
7,440✔
1408
        d.mutex.RLock()
7,440✔
1409
        defer d.mutex.RUnlock()
7,440✔
1410

7,440✔
1411
        if !d.isReplica() {
7,440✔
1412
                return nil, ErrNotReplica
×
1413
        }
×
1414

1415
        hdr, err := d.st.ReplicateTx(ctx, exportedTx, skipIntegrityCheck, waitForIndexing)
7,440✔
1416
        if err != nil {
7,470✔
1417
                return nil, err
30✔
1418
        }
30✔
1419

1420
        return schema.TxHeaderToProto(hdr), nil
7,410✔
1421
}
1422

1423
// AllowCommitUpto is used by replicas to commit transactions once committed in primary
1424
func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
5,860✔
1425
        d.mutex.RLock()
5,860✔
1426
        defer d.mutex.RUnlock()
5,860✔
1427

5,860✔
1428
        if !d.isReplica() {
5,860✔
1429
                return ErrNotReplica
×
1430
        }
×
1431

1432
        // replica pre-committed state must be consistent with primary
1433

1434
        committedTxID, committedAlh := d.st.CommittedAlh()
5,860✔
1435
        // handling a particular case in an optimized manner
5,860✔
1436
        if committedTxID == txID {
6,157✔
1437
                if committedAlh != alh {
297✔
1438
                        return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1439
                }
×
1440
                return nil
297✔
1441
        }
1442

1443
        hdr, err := d.st.ReadTxHeader(txID, true, false)
5,563✔
1444
        if err != nil {
5,563✔
1445
                return err
×
1446
        }
×
1447

1448
        if hdr.Alh() != alh {
5,563✔
1449
                return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1450
        }
×
1451

1452
        return d.st.AllowCommitUpto(txID)
5,563✔
1453
}
1454

1455
func (d *db) DiscardPrecommittedTxsSince(txID uint64) error {
2✔
1456
        d.mutex.RLock()
2✔
1457
        defer d.mutex.RUnlock()
2✔
1458

2✔
1459
        _, err := d.st.DiscardPrecommittedTxsSince(txID)
2✔
1460

2✔
1461
        return err
2✔
1462
}
2✔
1463

1464
// VerifiableTxByID ...
1465
func (d *db) VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error) {
1,299✔
1466
        if req == nil {
1,300✔
1467
                return nil, ErrIllegalArguments
1✔
1468
        }
1✔
1469

1470
        lastTxID, _ := d.st.CommittedAlh()
1,298✔
1471
        if lastTxID < req.ProveSinceTx {
1,298✔
1472
                return nil, fmt.Errorf("%w: latest txID=%d is lower than specified as initial tx=%d", ErrIllegalState, lastTxID, req.ProveSinceTx)
×
1473
        }
×
1474

1475
        var snap *store.Snapshot
1,298✔
1476
        var err error
1,298✔
1477

1,298✔
1478
        if !req.KeepReferencesUnresolved {
2,596✔
1479
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
1,298✔
1480
                if err != nil {
1,298✔
1481
                        return nil, err
×
1482
                }
×
1483
                defer snap.Close()
1,298✔
1484
        }
1485

1486
        reqTx, err := d.allocTx()
1,298✔
1487
        if err != nil {
1,298✔
1488
                return nil, err
×
1489
        }
×
1490
        defer d.releaseTx(reqTx)
1,298✔
1491

1,298✔
1492
        err = d.st.ReadTx(req.Tx, false, reqTx)
1,298✔
1493
        if err != nil {
1,298✔
1494
                return nil, err
×
1495
        }
×
1496

1497
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,298✔
1498
        var rootTxHdr *store.TxHeader
1,298✔
1499

1,298✔
1500
        if req.ProveSinceTx == 0 {
1,302✔
1501
                rootTxHdr = reqTx.Header()
4✔
1502
        } else {
1,298✔
1503
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
1,294✔
1504
                if err != nil {
1,294✔
1505
                        return nil, err
×
1506
                }
×
1507
        }
1508

1509
        if req.ProveSinceTx <= req.Tx {
2,595✔
1510
                sourceTxHdr = rootTxHdr
1,297✔
1511
                targetTxHdr = reqTx.Header()
1,297✔
1512
        } else {
1,298✔
1513
                sourceTxHdr = reqTx.Header()
1✔
1514
                targetTxHdr = rootTxHdr
1✔
1515
        }
1✔
1516

1517
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,298✔
1518
        if err != nil {
1,298✔
1519
                return nil, err
×
1520
        }
×
1521

1522
        sReqTx, err := d.serializeTx(ctx, reqTx, req.EntriesSpec, snap, true)
1,298✔
1523
        if err != nil {
1,298✔
1524
                return nil, err
×
1525
        }
×
1526

1527
        return &schema.VerifiableTx{
1,298✔
1528
                Tx:        sReqTx,
1,298✔
1529
                DualProof: schema.DualProofToProto(dualProof),
1,298✔
1530
        }, nil
1,298✔
1531
}
1532

1533
// TxScan ...
1534
func (d *db) TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error) {
10✔
1535
        if req == nil {
11✔
1536
                return nil, ErrIllegalArguments
1✔
1537
        }
1✔
1538

1539
        if int(req.Limit) > d.maxResultSize {
10✔
1540
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1541
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1542
        }
1✔
1543

1544
        tx, err := d.allocTx()
8✔
1545
        if err != nil {
8✔
1546
                return nil, err
×
1547
        }
×
1548
        defer d.releaseTx(tx)
8✔
1549

8✔
1550
        limit := int(req.Limit)
8✔
1551
        if req.Limit == 0 {
13✔
1552
                limit = d.maxResultSize
5✔
1553
        }
5✔
1554

1555
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
8✔
1556
        if err != nil {
8✔
1557
                return nil, err
×
1558
        }
×
1559
        defer snap.Close()
8✔
1560

8✔
1561
        txReader, err := d.st.NewTxReader(req.InitialTx, req.Desc, tx)
8✔
1562
        if err != nil {
9✔
1563
                return nil, err
1✔
1564
        }
1✔
1565

1566
        txList := &schema.TxList{}
7✔
1567

7✔
1568
        for l := 1; l <= limit; l++ {
27✔
1569
                tx, err := txReader.Read()
20✔
1570
                if errors.Is(err, store.ErrNoMoreEntries) {
22✔
1571
                        break
2✔
1572
                }
1573
                if err != nil {
18✔
1574
                        return nil, err
×
1575
                }
×
1576

1577
                sTx, err := d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
18✔
1578
                if err != nil {
18✔
1579
                        return nil, err
×
1580
                }
×
1581

1582
                txList.Txs = append(txList.Txs, sTx)
18✔
1583
        }
1584

1585
        return txList, nil
7✔
1586
}
1587

1588
// History ...
1589
func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error) {
8✔
1590
        if req == nil {
9✔
1591
                return nil, ErrIllegalArguments
1✔
1592
        }
1✔
1593

1594
        if int(req.Limit) > d.maxResultSize {
8✔
1595
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1596
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1597
        }
1✔
1598

1599
        currTxID, _ := d.st.CommittedAlh()
6✔
1600

6✔
1601
        if req.SinceTx > currTxID {
6✔
1602
                return nil, ErrIllegalArguments
×
1603
        }
×
1604

1605
        waitUntilTx := req.SinceTx
6✔
1606
        if waitUntilTx == 0 {
7✔
1607
                waitUntilTx = currTxID
1✔
1608
        }
1✔
1609

1610
        err := d.WaitForIndexingUpto(ctx, waitUntilTx)
6✔
1611
        if err != nil {
6✔
1612
                return nil, err
×
1613
        }
×
1614

1615
        limit := int(req.Limit)
6✔
1616
        if limit == 0 {
12✔
1617
                limit = d.maxResultSize
6✔
1618
        }
6✔
1619

1620
        key := EncodeKey(req.Key)
6✔
1621

6✔
1622
        valRefs, _, err := d.st.History(key, req.Offset, req.Desc, limit)
6✔
1623
        if err != nil && err != store.ErrOffsetOutOfRange {
6✔
1624
                return nil, err
×
1625
        }
×
1626

1627
        list := &schema.Entries{
6✔
1628
                Entries: make([]*schema.Entry, len(valRefs)),
6✔
1629
        }
6✔
1630

6✔
1631
        for i, valRef := range valRefs {
114✔
1632
                val, err := valRef.Resolve()
108✔
1633
                if err != nil && err != store.ErrExpiredEntry {
108✔
1634
                        return nil, err
×
1635
                }
×
1636
                if len(val) > 0 {
214✔
1637
                        val = TrimPrefix(val)
106✔
1638
                }
106✔
1639

1640
                list.Entries[i] = &schema.Entry{
108✔
1641
                        Tx:       valRef.Tx(),
108✔
1642
                        Key:      req.Key,
108✔
1643
                        Metadata: schema.KVMetadataToProto(valRef.KVMetadata()),
108✔
1644
                        Value:    val,
108✔
1645
                        Expired:  errors.Is(err, store.ErrExpiredEntry),
108✔
1646
                        Revision: valRef.HC(),
108✔
1647
                }
108✔
1648
        }
1649
        return list, nil
6✔
1650
}
1651

1652
func (d *db) IsClosed() bool {
×
1653
        d.closingMutex.Lock()
×
1654
        defer d.closingMutex.Unlock()
×
1655

×
1656
        return d.st.IsClosed()
×
1657
}
×
1658

1659
// Close ...
1660
func (d *db) Close() (err error) {
1,186✔
1661
        d.closingMutex.Lock()
1,186✔
1662
        defer d.closingMutex.Unlock()
1,186✔
1663

1,186✔
1664
        d.Logger.Infof("closing database '%s'...", d.name)
1,186✔
1665

1,186✔
1666
        defer func() {
2,372✔
1667
                if err == nil {
1,958✔
1668
                        d.Logger.Infof("database '%s' successfully closed", d.name)
772✔
1669
                } else {
1,186✔
1670
                        d.Logger.Infof("%v: while closing database '%s'", err, d.name)
414✔
1671
                }
414✔
1672
        }()
1673

1674
        return d.st.Close()
1,186✔
1675
}
1676

1677
// GetName ...
1678
func (d *db) GetName() string {
964✔
1679
        return d.name
964✔
1680
}
964✔
1681

1682
// GetOptions ...
1683
func (d *db) GetOptions() *Options {
3,488✔
1684
        d.mutex.RLock()
3,488✔
1685
        defer d.mutex.RUnlock()
3,488✔
1686

3,488✔
1687
        return d.options
3,488✔
1688
}
3,488✔
1689

1690
func (d *db) AsReplica(asReplica, syncReplication bool, syncAcks int) {
16✔
1691
        d.mutex.Lock()
16✔
1692
        defer d.mutex.Unlock()
16✔
1693

16✔
1694
        d.replicaStatesMutex.Lock()
16✔
1695
        defer d.replicaStatesMutex.Unlock()
16✔
1696

16✔
1697
        d.options.replica = asReplica
16✔
1698
        d.options.syncAcks = syncAcks
16✔
1699
        d.options.syncReplication = syncReplication
16✔
1700

16✔
1701
        if asReplica {
25✔
1702
                d.replicaStates = nil
9✔
1703
        } else if syncAcks > 0 {
19✔
1704
                d.replicaStates = make(map[string]*replicaState, syncAcks)
3✔
1705
        }
3✔
1706

1707
        d.st.SetExternalCommitAllowance(syncReplication)
16✔
1708
}
1709

1710
func (d *db) IsReplica() bool {
606✔
1711
        d.mutex.RLock()
606✔
1712
        defer d.mutex.RUnlock()
606✔
1713

606✔
1714
        return d.isReplica()
606✔
1715
}
606✔
1716

1717
func (d *db) isReplica() bool {
19,475✔
1718
        return d.options.replica
19,475✔
1719
}
19,475✔
1720

1721
func (d *db) IsSyncReplicationEnabled() bool {
8,169✔
1722
        d.mutex.RLock()
8,169✔
1723
        defer d.mutex.RUnlock()
8,169✔
1724

8,169✔
1725
        return d.options.syncReplication
8,169✔
1726
}
8,169✔
1727

1728
func (d *db) SetSyncReplication(enabled bool) {
293✔
1729
        d.mutex.Lock()
293✔
1730
        defer d.mutex.Unlock()
293✔
1731

293✔
1732
        d.st.SetExternalCommitAllowance(enabled)
293✔
1733

293✔
1734
        d.options.syncReplication = enabled
293✔
1735
}
293✔
1736

1737
func logErr(log logger.Logger, formattedMessage string, err error) error {
1✔
1738
        if err != nil {
2✔
1739
                log.Errorf(formattedMessage, err)
1✔
1740
        }
1✔
1741
        return err
1✔
1742
}
1743

1744
// CopyCatalog creates a copy of the sql catalog and returns a transaction
1745
// that can be used to commit the copy.
1746
func (d *db) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
14✔
1747
        // copy the sql catalog
14✔
1748
        err := d.sqlEngine.CopyCatalogToTx(ctx, tx)
14✔
1749
        if err != nil {
14✔
1750
                return err
×
1751
        }
×
1752

1753
        // copy the document store catalog
1754
        err = d.documentEngine.CopyCatalogToTx(ctx, tx)
14✔
1755
        if err != nil {
14✔
1756
                return err
×
1757
        }
×
1758

1759
        return nil
14✔
1760
}
1761

1762
func (d *db) FindTruncationPoint(ctx context.Context, until time.Time) (*schema.TxHeader, error) {
10✔
1763
        hdr, err := d.st.LastTxUntil(until)
10✔
1764
        if errors.Is(err, store.ErrTxNotFound) {
17✔
1765
                return nil, ErrRetentionPeriodNotReached
7✔
1766
        }
7✔
1767
        if err != nil {
3✔
1768
                return nil, err
×
1769
        }
×
1770

1771
        // look for the newst transaction with entries
1772
        for err == nil {
6✔
1773
                if hdr.NEntries > 0 {
6✔
1774
                        break
3✔
1775
                }
1776

1777
                if ctx.Err() != nil {
×
1778
                        return nil, err
×
1779
                }
×
1780

1781
                hdr, err = d.st.ReadTxHeader(hdr.ID-1, false, false)
×
1782
        }
1783
        return schema.TxHeaderToProto(hdr), nil
3✔
1784
}
1785

1786
func (d *db) TruncateUptoTx(_ context.Context, txID uint64) error {
14✔
1787
        return d.st.TruncateUptoTx(txID)
14✔
1788
}
14✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc