• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

codenotary / immudb / 9299226861

30 May 2024 08:12AM UTC coverage: 89.451% (-0.04%) from 89.49%
9299226861

push

gh-ci

ostafen
Log request information as transaction metadata

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>

120 of 152 new or added lines in 16 files covered. (78.95%)

6 existing lines in 2 files now uncovered.

34859 of 38970 relevant lines covered (89.45%)

161745.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.77
/pkg/database/database.go
1
/*
2
Copyright 2024 Codenotary Inc. All rights reserved.
3

4
SPDX-License-Identifier: BUSL-1.1
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    https://mariadb.com/bsl11/
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package database
18

19
import (
20
        "context"
21
        "crypto/sha256"
22
        "encoding/binary"
23
        "errors"
24
        "fmt"
25
        "math"
26
        "os"
27
        "path/filepath"
28
        "sync"
29
        "time"
30

31
        "github.com/codenotary/immudb/embedded/document"
32
        "github.com/codenotary/immudb/embedded/sql"
33
        "github.com/codenotary/immudb/embedded/store"
34

35
        "github.com/codenotary/immudb/embedded/logger"
36
        "github.com/codenotary/immudb/pkg/api/schema"
37
)
38

39
const (
40
        MaxKeyResolutionLimit = 1
41
        MaxKeyScanLimit       = 2500
42
)
43

44
var (
45
        ErrKeyResolutionLimitReached  = errors.New("key resolution limit reached. It may be due to cyclic references")
46
        ErrResultSizeLimitExceeded    = errors.New("result size limit exceeded")
47
        ErrResultSizeLimitReached     = errors.New("result size limit reached")
48
        ErrIllegalArguments           = store.ErrIllegalArguments
49
        ErrIllegalState               = store.ErrIllegalState
50
        ErrIsReplica                  = errors.New("database is read-only because it's a replica")
51
        ErrNotReplica                 = errors.New("database is NOT a replica")
52
        ErrReplicaDivergedFromPrimary = errors.New("replica diverged from primary")
53
        ErrInvalidRevision            = errors.New("invalid key revision number")
54
)
55

56
type DB interface {
57
        GetName() string
58

59
        // Setttings
60
        GetOptions() *Options
61

62
        Path() string
63

64
        AsReplica(asReplica, syncReplication bool, syncAcks int)
65
        IsReplica() bool
66

67
        IsSyncReplicationEnabled() bool
68
        SetSyncReplication(enabled bool)
69

70
        MaxResultSize() int
71

72
        // State
73
        Health() (waitingCount int, lastReleaseAt time.Time)
74
        CurrentState() (*schema.ImmutableState, error)
75

76
        Size() (uint64, error)
77

78
        TxCount() (uint64, error)
79

80
        // Key-Value
81
        Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error)
82
        VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error)
83

84
        Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error)
85
        VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error)
86
        GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error)
87

88
        Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error)
89

90
        SetReference(ctx context.Context, req *schema.ReferenceRequest) (*schema.TxHeader, error)
91
        VerifiableSetReference(ctx context.Context, req *schema.VerifiableReferenceRequest) (*schema.VerifiableTx, error)
92

93
        Scan(ctx context.Context, req *schema.ScanRequest) (*schema.Entries, error)
94

95
        History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error)
96

97
        ExecAll(ctx context.Context, operations *schema.ExecAllRequest) (*schema.TxHeader, error)
98

99
        Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error)
100
        CountAll(ctx context.Context) (*schema.EntryCount, error)
101

102
        ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error)
103
        VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error)
104
        ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error)
105

106
        // SQL-related
107
        NewSQLTx(ctx context.Context, opts *sql.TxOptions) (*sql.SQLTx, error)
108

109
        SQLExec(ctx context.Context, tx *sql.SQLTx, req *schema.SQLExecRequest) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
110
        SQLExecPrepared(ctx context.Context, tx *sql.SQLTx, stmts []sql.SQLStmt, params map[string]interface{}) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error)
111

112
        InferParameters(ctx context.Context, tx *sql.SQLTx, sql string) (map[string]sql.SQLValueType, error)
113
        InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.SQLStmt) (map[string]sql.SQLValueType, error)
114

115
        SQLQuery(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) (sql.RowReader, error)
116
        SQLQueryAll(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) ([]*sql.Row, error)
117
        SQLQueryPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, params map[string]interface{}) (sql.RowReader, error)
118

119
        VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error)
120

121
        ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error)
122
        DescribeTable(ctx context.Context, tx *sql.SQLTx, table string) (*schema.SQLQueryResult, error)
123

124
        // Transactional layer
125
        WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error
126
        WaitForIndexingUpto(ctx context.Context, txID uint64) error
127

128
        TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error)
129
        ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error)
130
        ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error)
131
        AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error
132
        DiscardPrecommittedTxsSince(txID uint64) error
133

134
        VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error)
135
        TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error)
136

137
        // Maintenance
138
        FlushIndex(req *schema.FlushIndexRequest) error
139
        CompactIndex() error
140

141
        IsClosed() bool
142
        Close() error
143

144
        DocumentDatabase
145
}
146

147
type replicaState struct {
148
        precommittedTxID uint64
149
        precommittedAlh  [sha256.Size]byte
150
}
151

152
// IDB database instance
153
type db struct {
154
        st *store.ImmuStore
155

156
        sqlEngine      *sql.Engine
157
        documentEngine *document.Engine
158

159
        mutex        *instrumentedRWMutex
160
        closingMutex sync.Mutex
161

162
        Logger  logger.Logger
163
        options *Options
164

165
        name string
166

167
        maxResultSize int
168

169
        txPool store.TxPool
170

171
        replicaStates      map[string]*replicaState
172
        replicaStatesMutex sync.Mutex
173
}
174

175
// OpenDB Opens an existing Database from disk
176
func OpenDB(dbName string, multidbHandler sql.MultiDBHandler, opts *Options, log logger.Logger) (DB, error) {
37✔
177
        if dbName == "" {
38✔
178
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
1✔
179
        }
1✔
180

181
        log.Infof("opening database '%s' {replica = %v}...", dbName, opts.replica)
36✔
182

36✔
183
        var replicaStates map[string]*replicaState
36✔
184
        // replica states are only managed in primary with synchronous replication
36✔
185
        if !opts.replica && opts.syncAcks > 0 {
37✔
186
                replicaStates = make(map[string]*replicaState, opts.syncAcks)
1✔
187
        }
1✔
188

189
        dbi := &db{
36✔
190
                Logger:        log,
36✔
191
                options:       opts,
36✔
192
                name:          dbName,
36✔
193
                replicaStates: replicaStates,
36✔
194
                maxResultSize: opts.maxResultSize,
36✔
195
                mutex:         &instrumentedRWMutex{},
36✔
196
        }
36✔
197

36✔
198
        dbDir := dbi.Path()
36✔
199
        _, err := os.Stat(dbDir)
36✔
200
        if os.IsNotExist(err) {
37✔
201
                return nil, fmt.Errorf("missing database directories: %s", dbDir)
1✔
202
        }
1✔
203

204
        stOpts := opts.GetStoreOptions().
35✔
205
                WithLogger(log).
35✔
206
                WithMultiIndexing(true).
35✔
207
                WithExternalCommitAllowance(opts.syncReplication)
35✔
208

35✔
209
        dbi.st, err = store.Open(dbDir, stOpts)
35✔
210
        if err != nil {
35✔
211
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
212
        }
×
213

214
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
105✔
215
                err := dbi.st.InitIndexing(&store.IndexSpec{
70✔
216
                        SourcePrefix: []byte{prefix},
70✔
217
                        TargetPrefix: []byte{prefix},
70✔
218
                })
70✔
219
                if err != nil {
70✔
220
                        dbi.st.Close()
×
221
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
222
                }
×
223
        }
224

225
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, opts.replica)
35✔
226

35✔
227
        sqlOpts := sql.DefaultOptions().
35✔
228
                WithPrefix([]byte{SQLPrefix}).
35✔
229
                WithMultiDBHandler(multidbHandler)
35✔
230

35✔
231
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
35✔
232
        if err != nil {
35✔
233
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, opts.replica, err)
×
234
                return nil, err
×
235
        }
×
236
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
35✔
237

35✔
238
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
35✔
239
        if err != nil {
35✔
240
                return nil, err
×
241
        }
×
242
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
35✔
243

35✔
244
        txPool, err := dbi.st.NewTxHolderPool(opts.readTxPoolSize, false)
35✔
245
        if err != nil {
35✔
246
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
247
        }
×
248
        dbi.txPool = txPool
35✔
249

35✔
250
        if opts.replica {
39✔
251
                dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, opts.replica)
4✔
252
                return dbi, nil
4✔
253
        }
4✔
254

255
        dbi.Logger.Infof("database '%s' {replica = %v} successfully opened", dbName, opts.replica)
31✔
256

31✔
257
        return dbi, nil
31✔
258
}
259

260
func (d *db) Path() string {
38✔
261
        return filepath.Join(d.options.GetDBRootPath(), d.GetName())
38✔
262
}
38✔
263

264
func (d *db) allocTx() (*store.Tx, error) {
10,897✔
265
        tx, err := d.txPool.Alloc()
10,897✔
266
        if errors.Is(err, store.ErrTxPoolExhausted) {
10,897✔
267
                return nil, ErrTxReadPoolExhausted
×
268
        }
×
269
        return tx, err
10,897✔
270
}
271

272
func (d *db) releaseTx(tx *store.Tx) {
10,897✔
273
        d.txPool.Release(tx)
10,897✔
274
}
10,897✔
275

276
// NewDB Creates a new Database along with it's directories and files
277
func NewDB(dbName string, multidbHandler sql.MultiDBHandler, opts *Options, log logger.Logger) (DB, error) {
660✔
278
        if dbName == "" {
660✔
279
                return nil, fmt.Errorf("%w: invalid database name provided '%s'", ErrIllegalArguments, dbName)
×
280
        }
×
281

282
        log.Infof("creating database '%s' {replica = %v}...", dbName, opts.replica)
660✔
283

660✔
284
        var replicaStates map[string]*replicaState
660✔
285
        // replica states are only managed in primary with synchronous replication
660✔
286
        if !opts.replica && opts.syncAcks > 0 {
671✔
287
                replicaStates = make(map[string]*replicaState, opts.syncAcks)
11✔
288
        }
11✔
289

290
        dbi := &db{
660✔
291
                Logger:        log,
660✔
292
                options:       opts,
660✔
293
                name:          dbName,
660✔
294
                replicaStates: replicaStates,
660✔
295
                maxResultSize: opts.maxResultSize,
660✔
296
                mutex:         &instrumentedRWMutex{},
660✔
297
        }
660✔
298

660✔
299
        dbDir := filepath.Join(opts.GetDBRootPath(), dbName)
660✔
300

660✔
301
        _, err := os.Stat(dbDir)
660✔
302
        if err == nil {
661✔
303
                return nil, fmt.Errorf("database directories already exist: %s", dbDir)
1✔
304
        }
1✔
305

306
        if err = os.MkdirAll(dbDir, os.ModePerm); err != nil {
660✔
307
                return nil, logErr(dbi.Logger, "unable to create data folder: %s", err)
1✔
308
        }
1✔
309

310
        stOpts := opts.GetStoreOptions().
658✔
311
                WithExternalCommitAllowance(opts.syncReplication).
658✔
312
                WithMultiIndexing(true).
658✔
313
                WithLogger(log)
658✔
314

658✔
315
        dbi.st, err = store.Open(dbDir, stOpts)
658✔
316
        if err != nil {
658✔
317
                return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
318
        }
×
319

320
        for _, prefix := range []byte{SetKeyPrefix, SortedSetKeyPrefix} {
1,974✔
321
                err := dbi.st.InitIndexing(&store.IndexSpec{
1,316✔
322
                        SourcePrefix: []byte{prefix},
1,316✔
323
                        TargetPrefix: []byte{prefix},
1,316✔
324
                })
1,316✔
325
                if err != nil {
1,316✔
326
                        dbi.st.Close()
×
327
                        return nil, logErr(dbi.Logger, "unable to open database: %s", err)
×
328
                }
×
329
        }
330

331
        txPool, err := dbi.st.NewTxHolderPool(opts.readTxPoolSize, false)
658✔
332
        if err != nil {
658✔
333
                return nil, logErr(dbi.Logger, "unable to create tx pool: %s", err)
×
334
        }
×
335
        dbi.txPool = txPool
658✔
336

658✔
337
        sqlOpts := sql.DefaultOptions().
658✔
338
                WithPrefix([]byte{SQLPrefix}).
658✔
339
                WithMultiDBHandler(multidbHandler)
658✔
340

658✔
341
        dbi.Logger.Infof("loading sql-engine for database '%s' {replica = %v}...", dbName, opts.replica)
658✔
342

658✔
343
        dbi.sqlEngine, err = sql.NewEngine(dbi.st, sqlOpts)
658✔
344
        if err != nil {
658✔
345
                dbi.Logger.Errorf("unable to load sql-engine for database '%s' {replica = %v}. %v", dbName, opts.replica, err)
×
346
                return nil, err
×
347
        }
×
348
        dbi.Logger.Infof("sql-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
658✔
349

658✔
350
        dbi.documentEngine, err = document.NewEngine(dbi.st, document.DefaultOptions().WithPrefix([]byte{DocumentPrefix}))
658✔
351
        if err != nil {
658✔
352
                return nil, logErr(dbi.Logger, "Unable to open database: %s", err)
×
353
        }
×
354
        dbi.Logger.Infof("document-engine ready for database '%s' {replica = %v}", dbName, opts.replica)
658✔
355

658✔
356
        dbi.Logger.Infof("database '%s' successfully created {replica = %v}", dbName, opts.replica)
658✔
357

658✔
358
        return dbi, nil
658✔
359
}
360

361
func (d *db) MaxResultSize() int {
78✔
362
        return d.maxResultSize
78✔
363
}
78✔
364

365
func (d *db) FlushIndex(req *schema.FlushIndexRequest) error {
6✔
366
        if req == nil {
7✔
367
                return store.ErrIllegalArguments
1✔
368
        }
1✔
369

370
        return d.st.FlushIndexes(req.CleanupPercentage, req.Synced)
5✔
371
}
372

373
// CompactIndex ...
374
func (d *db) CompactIndex() error {
4✔
375
        return d.st.CompactIndexes()
4✔
376
}
4✔
377

378
// Set ...
379
func (d *db) Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,424✔
380
        d.mutex.RLock()
4,424✔
381
        defer d.mutex.RUnlock()
4,424✔
382

4,424✔
383
        if d.isReplica() {
4,427✔
384
                return nil, ErrIsReplica
3✔
385
        }
3✔
386

387
        return d.set(ctx, req)
4,421✔
388
}
389

390
func (d *db) set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) {
4,421✔
391
        if req == nil {
4,422✔
392
                return nil, ErrIllegalArguments
1✔
393
        }
1✔
394

395
        tx, err := d.newWriteOnlyTx(ctx)
4,420✔
396
        if err != nil {
4,420✔
397
                return nil, err
×
398
        }
×
399
        defer tx.Cancel()
4,420✔
400

4,420✔
401
        keys := make(map[[sha256.Size]byte]struct{}, len(req.KVs))
4,420✔
402

4,420✔
403
        for _, kv := range req.KVs {
10,135✔
404
                if len(kv.Key) == 0 {
5,717✔
405
                        return nil, ErrIllegalArguments
2✔
406
                }
2✔
407

408
                kid := sha256.Sum256(kv.Key)
5,713✔
409
                _, ok := keys[kid]
5,713✔
410
                if ok {
5,714✔
411
                        return nil, schema.ErrDuplicatedKeysNotSupported
1✔
412
                }
1✔
413
                keys[kid] = struct{}{}
5,712✔
414

5,712✔
415
                e := EncodeEntrySpec(
5,712✔
416
                        kv.Key,
5,712✔
417
                        schema.KVMetadataFromProto(kv.Metadata),
5,712✔
418
                        kv.Value,
5,712✔
419
                )
5,712✔
420

5,712✔
421
                err = tx.Set(e.Key, e.Metadata, e.Value)
5,712✔
422
                if err != nil {
5,713✔
423
                        return nil, err
1✔
424
                }
1✔
425
        }
426

427
        for i := range req.Preconditions {
5,487✔
428
                c, err := PreconditionFromProto(req.Preconditions[i])
1,071✔
429
                if err != nil {
1,072✔
430
                        return nil, err
1✔
431
                }
1✔
432

433
                err = tx.AddPrecondition(c)
1,070✔
434
                if err != nil {
1,071✔
435
                        return nil, fmt.Errorf("%w: %v", store.ErrInvalidPrecondition, err)
1✔
436
                }
1✔
437
        }
438

439
        var hdr *store.TxHeader
4,414✔
440

4,414✔
441
        if req.NoWait {
4,414✔
442
                hdr, err = tx.AsyncCommit(ctx)
×
443
        } else {
4,414✔
444
                hdr, err = tx.Commit(ctx)
4,414✔
445
        }
4,414✔
446
        if err != nil {
4,445✔
447
                return nil, err
31✔
448
        }
31✔
449

450
        return schema.TxHeaderToProto(hdr), nil
4,383✔
451
}
452

453
func (d *db) newWriteOnlyTx(ctx context.Context) (*store.OngoingTx, error) {
4,420✔
454
        tx, err := d.st.NewWriteOnlyTx(ctx)
4,420✔
455
        if err != nil {
4,420✔
NEW
456
                return nil, err
×
NEW
457
        }
×
458
        return d.txWithMetadata(ctx, tx)
4,420✔
459
}
460

461
func (d *db) newTx(ctx context.Context, opts *store.TxOptions) (*store.OngoingTx, error) {
7✔
462
        tx, err := d.st.NewTx(ctx, opts)
7✔
463
        if err != nil {
7✔
NEW
464
                return nil, err
×
NEW
465
        }
×
466
        return d.txWithMetadata(ctx, tx)
7✔
467
}
468

469
func (d *db) txWithMetadata(ctx context.Context, tx *store.OngoingTx) (*store.OngoingTx, error) {
4,427✔
470
        meta := schema.MetadataFromContext(ctx)
4,427✔
471
        if len(meta) > 0 {
4,487✔
472
                txmd := store.NewTxMetadata()
60✔
473

60✔
474
                data, err := meta.Marshal()
60✔
475
                if err != nil {
60✔
NEW
476
                        return nil, err
×
NEW
477
                }
×
478

479
                if err := txmd.WithExtra(data); err != nil {
60✔
NEW
480
                        return nil, err
×
NEW
481
                }
×
482
                return tx.WithMetadata(txmd), nil
60✔
483
        }
484
        return tx, nil
4,367✔
485
}
486

487
func checkKeyRequest(req *schema.KeyRequest) error {
3,098✔
488
        if req == nil {
3,099✔
489
                return fmt.Errorf(
1✔
490
                        "%w: empty request",
1✔
491
                        ErrIllegalArguments,
1✔
492
                )
1✔
493
        }
1✔
494

495
        if len(req.Key) == 0 {
3,099✔
496
                return fmt.Errorf(
2✔
497
                        "%w: empty key",
2✔
498
                        ErrIllegalArguments,
2✔
499
                )
2✔
500
        }
2✔
501

502
        if req.AtTx > 0 {
3,120✔
503
                if req.SinceTx > 0 {
27✔
504
                        return fmt.Errorf(
2✔
505
                                "%w: SinceTx should not be specified when AtTx is used",
2✔
506
                                ErrIllegalArguments,
2✔
507
                        )
2✔
508
                }
2✔
509

510
                if req.AtRevision != 0 {
24✔
511
                        return fmt.Errorf(
1✔
512
                                "%w: AtRevision should not be specified when AtTx is used",
1✔
513
                                ErrIllegalArguments,
1✔
514
                        )
1✔
515
                }
1✔
516
        }
517

518
        return nil
3,092✔
519
}
520

521
// Get ...
522
func (d *db) Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error) {
3,094✔
523
        err := checkKeyRequest(req)
3,094✔
524
        if err != nil {
3,096✔
525
                return nil, err
2✔
526
        }
2✔
527

528
        currTxID, _ := d.st.CommittedAlh()
3,092✔
529
        if req.SinceTx > currTxID {
3,093✔
530
                return nil, fmt.Errorf(
1✔
531
                        "%w: SinceTx must not be greater than the current transaction ID",
1✔
532
                        ErrIllegalArguments,
1✔
533
                )
1✔
534
        }
1✔
535

536
        if !req.NoWait && req.AtTx == 0 {
6,160✔
537
                waitUntilTx := req.SinceTx
3,069✔
538
                if waitUntilTx == 0 {
5,041✔
539
                        waitUntilTx = currTxID
1,972✔
540
                }
1,972✔
541

542
                err := d.WaitForIndexingUpto(ctx, waitUntilTx)
3,069✔
543
                if err != nil {
3,071✔
544
                        return nil, err
2✔
545
                }
2✔
546
        }
547

548
        if req.AtRevision != 0 {
3,120✔
549
                return d.getAtRevision(ctx, EncodeKey(req.Key), req.AtRevision, true)
31✔
550
        }
31✔
551

552
        return d.getAtTx(ctx, EncodeKey(req.Key), req.AtTx, 0, d.st, 0, true)
3,058✔
553
}
554

555
func (d *db) get(ctx context.Context, key []byte, index store.KeyIndex, skipIntegrityCheck bool) (*schema.Entry, error) {
10✔
556
        return d.getAtTx(ctx, key, 0, 0, index, 0, skipIntegrityCheck)
10✔
557
}
10✔
558

559
func (d *db) getAtTx(
560
        ctx context.Context,
561
        key []byte,
562
        atTx uint64,
563
        resolved int,
564
        index store.KeyIndex,
565
        revision uint64,
566
        skipIntegrityCheck bool,
567
) (entry *schema.Entry, err error) {
5,632✔
568

5,632✔
569
        var txID uint64
5,632✔
570
        var val []byte
5,632✔
571
        var md *store.KVMetadata
5,632✔
572

5,632✔
573
        if atTx == 0 {
9,346✔
574
                valRef, err := index.Get(ctx, key)
3,714✔
575
                if err != nil {
3,795✔
576
                        return nil, err
81✔
577
                }
81✔
578

579
                txID = valRef.Tx()
3,633✔
580
                revision = valRef.HC()
3,633✔
581
                md = valRef.KVMetadata()
3,633✔
582

3,633✔
583
                val, err = valRef.Resolve()
3,633✔
584
                if err != nil {
3,637✔
585
                        return nil, err
4✔
586
                }
4✔
587
        } else {
1,918✔
588
                txID = atTx
1,918✔
589

1,918✔
590
                md, val, err = d.readMetadataAndValue(key, atTx, skipIntegrityCheck)
1,918✔
591
                if err != nil {
2,013✔
592
                        return nil, err
95✔
593
                }
95✔
594
        }
595

596
        return d.resolveValue(ctx, key, val, resolved, txID, md, index, revision, skipIntegrityCheck)
5,452✔
597
}
598

599
func (d *db) readMetadataAndValue(key []byte, atTx uint64, skipIntegrityCheck bool) (*store.KVMetadata, []byte, error) {
1,918✔
600
        entry, _, err := d.st.ReadTxEntry(atTx, key, skipIntegrityCheck)
1,918✔
601
        if err != nil {
2,004✔
602
                return nil, nil, err
86✔
603
        }
86✔
604

605
        v, err := d.st.ReadValue(entry)
1,832✔
606
        if err != nil {
1,841✔
607
                return nil, nil, err
9✔
608
        }
9✔
609

610
        return entry.Metadata(), v, nil
1,823✔
611
}
612

613
func (d *db) getAtRevision(ctx context.Context, key []byte, atRevision int64, skipIntegrityCheck bool) (entry *schema.Entry, err error) {
31✔
614
        var offset uint64
31✔
615
        var desc bool
31✔
616

31✔
617
        if atRevision > 0 {
47✔
618
                offset = uint64(atRevision) - 1
16✔
619
                desc = false
16✔
620
        } else {
31✔
621
                offset = -uint64(atRevision)
15✔
622
                desc = true
15✔
623
        }
15✔
624

625
        valRefs, hCount, err := d.st.History(key, offset, desc, 1)
31✔
626
        if errors.Is(err, store.ErrNoMoreEntries) || errors.Is(err, store.ErrOffsetOutOfRange) {
35✔
627
                return nil, ErrInvalidRevision
4✔
628
        }
4✔
629
        if err != nil {
28✔
630
                return nil, err
1✔
631
        }
1✔
632

633
        if atRevision < 0 {
39✔
634
                atRevision = int64(hCount) + atRevision
13✔
635
        }
13✔
636

637
        entry, err = d.getAtTx(ctx, key, valRefs[0].Tx(), 0, d.st, uint64(atRevision), skipIntegrityCheck)
26✔
638
        if err != nil {
27✔
639
                return nil, err
1✔
640
        }
1✔
641

642
        return entry, err
25✔
643
}
644

645
func (d *db) resolveValue(
646
        ctx context.Context,
647
        key []byte,
648
        val []byte,
649
        resolved int,
650
        txID uint64,
651
        md *store.KVMetadata,
652
        index store.KeyIndex,
653
        revision uint64,
654
        skipIntegrityCheck bool,
655
) (entry *schema.Entry, err error) {
5,461✔
656
        if md != nil && md.Deleted() {
5,463✔
657
                return nil, store.ErrKeyNotFound
2✔
658
        }
2✔
659

660
        if len(val) < 1 {
5,459✔
661
                return nil, fmt.Errorf("%w: internal value consistency error - missing value prefix", store.ErrCorruptedData)
×
662
        }
×
663

664
        // Reference lookup
665
        if val[0] == ReferenceValuePrefix {
5,671✔
666
                if len(val) < 1+8 {
212✔
667
                        return nil, fmt.Errorf("%w: internal value consistency error - invalid reference", store.ErrCorruptedData)
×
668
                }
×
669

670
                if resolved == MaxKeyResolutionLimit {
214✔
671
                        return nil, ErrKeyResolutionLimitReached
2✔
672
                }
2✔
673

674
                atTx := binary.BigEndian.Uint64(TrimPrefix(val))
210✔
675
                refKey := make([]byte, len(val)-1-8)
210✔
676
                copy(refKey, val[1+8:])
210✔
677

210✔
678
                if index != nil {
419✔
679
                        entry, err = d.getAtTx(ctx, refKey, atTx, resolved+1, index, 0, skipIntegrityCheck)
209✔
680
                        if err != nil {
210✔
681
                                return nil, err
1✔
682
                        }
1✔
683
                } else {
1✔
684
                        entry = &schema.Entry{
1✔
685
                                Key: TrimPrefix(refKey),
1✔
686
                                Tx:  atTx,
1✔
687
                        }
1✔
688
                }
1✔
689

690
                entry.ReferencedBy = &schema.Reference{
209✔
691
                        Tx:       txID,
209✔
692
                        Key:      TrimPrefix(key),
209✔
693
                        Metadata: schema.KVMetadataToProto(md),
209✔
694
                        AtTx:     atTx,
209✔
695
                        Revision: revision,
209✔
696
                }
209✔
697

209✔
698
                return entry, nil
209✔
699
        }
700

701
        return &schema.Entry{
5,247✔
702
                Tx:       txID,
5,247✔
703
                Key:      TrimPrefix(key),
5,247✔
704
                Metadata: schema.KVMetadataToProto(md),
5,247✔
705
                Value:    TrimPrefix(val),
5,247✔
706
                Revision: revision,
5,247✔
707
        }, nil
5,247✔
708
}
709

710
func (d *db) Health() (waitingCount int, lastReleaseAt time.Time) {
1✔
711
        return d.mutex.State()
1✔
712
}
1✔
713

714
// CurrentState ...
715
func (d *db) CurrentState() (*schema.ImmutableState, error) {
15,191✔
716
        lastTxID, lastTxAlh := d.st.CommittedAlh()
15,191✔
717
        lastPreTxID, lastPreTxAlh := d.st.PrecommittedAlh()
15,191✔
718

15,191✔
719
        return &schema.ImmutableState{
15,191✔
720
                TxId:               lastTxID,
15,191✔
721
                TxHash:             lastTxAlh[:],
15,191✔
722
                PrecommittedTxId:   lastPreTxID,
15,191✔
723
                PrecommittedTxHash: lastPreTxAlh[:],
15,191✔
724
        }, nil
15,191✔
725
}
15,191✔
726

727
// WaitForTx blocks caller until specified tx
728
func (d *db) WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error {
8,376✔
729
        return d.st.WaitForTx(ctx, txID, allowPrecommitted)
8,376✔
730
}
8,376✔
731

732
// WaitForIndexingUpto blocks caller until specified tx gets indexed
733
func (d *db) WaitForIndexingUpto(ctx context.Context, txID uint64) error {
3,076✔
734
        return d.st.WaitForIndexingUpto(ctx, txID)
3,076✔
735
}
3,076✔
736

737
// VerifiableSet ...
738
func (d *db) VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error) {
49✔
739
        if req == nil {
50✔
740
                return nil, ErrIllegalArguments
1✔
741
        }
1✔
742

743
        lastTxID, _ := d.st.CommittedAlh()
48✔
744
        if lastTxID < req.ProveSinceTx {
49✔
745
                return nil, ErrIllegalState
1✔
746
        }
1✔
747

748
        // Preallocate tx buffers
749
        lastTx, err := d.allocTx()
47✔
750
        if err != nil {
47✔
751
                return nil, err
×
752
        }
×
753
        defer d.releaseTx(lastTx)
47✔
754

47✔
755
        txhdr, err := d.Set(ctx, req.SetRequest)
47✔
756
        if err != nil {
48✔
757
                return nil, err
1✔
758
        }
1✔
759

760
        err = d.st.ReadTx(uint64(txhdr.Id), false, lastTx)
46✔
761
        if err != nil {
46✔
762
                return nil, err
×
763
        }
×
764

765
        var prevTxHdr *store.TxHeader
46✔
766

46✔
767
        if req.ProveSinceTx == 0 {
66✔
768
                prevTxHdr = lastTx.Header()
20✔
769
        } else {
46✔
770
                prevTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
26✔
771
                if err != nil {
26✔
772
                        return nil, err
×
773
                }
×
774
        }
775

776
        dualProof, err := d.st.DualProof(prevTxHdr, lastTx.Header())
46✔
777
        if err != nil {
46✔
778
                return nil, err
×
779
        }
×
780

781
        return &schema.VerifiableTx{
46✔
782
                Tx:        schema.TxToProto(lastTx),
46✔
783
                DualProof: schema.DualProofToProto(dualProof),
46✔
784
        }, nil
46✔
785
}
786

787
// VerifiableGet ...
788
func (d *db) VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error) {
1,038✔
789
        if req == nil {
1,039✔
790
                return nil, ErrIllegalArguments
1✔
791
        }
1✔
792

793
        lastTxID, _ := d.st.CommittedAlh()
1,037✔
794
        if lastTxID < req.ProveSinceTx {
1,038✔
795
                return nil, ErrIllegalState
1✔
796
        }
1✔
797

798
        e, err := d.Get(ctx, req.KeyRequest)
1,036✔
799
        if err != nil {
1,040✔
800
                return nil, err
4✔
801
        }
4✔
802

803
        var vTxID uint64
1,032✔
804
        var vKey []byte
1,032✔
805

1,032✔
806
        if e.ReferencedBy == nil {
2,059✔
807
                vTxID = e.Tx
1,027✔
808
                vKey = e.Key
1,027✔
809
        } else {
1,032✔
810
                vTxID = e.ReferencedBy.Tx
5✔
811
                vKey = e.ReferencedBy.Key
5✔
812
        }
5✔
813

814
        // key-value inclusion proof
815
        tx, err := d.allocTx()
1,032✔
816
        if err != nil {
1,032✔
817
                return nil, err
×
818
        }
×
819
        defer d.releaseTx(tx)
1,032✔
820

1,032✔
821
        err = d.st.ReadTx(vTxID, false, tx)
1,032✔
822
        if err != nil {
1,032✔
823
                return nil, err
×
824
        }
×
825

826
        var rootTxHdr *store.TxHeader
1,032✔
827

1,032✔
828
        if req.ProveSinceTx == 0 {
2,043✔
829
                rootTxHdr = tx.Header()
1,011✔
830
        } else {
1,032✔
831
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
21✔
832
                if err != nil {
21✔
833
                        return nil, err
×
834
                }
×
835
        }
836

837
        inclusionProof, err := tx.Proof(EncodeKey(vKey))
1,032✔
838
        if err != nil {
1,032✔
839
                return nil, err
×
840
        }
×
841

842
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,032✔
843

1,032✔
844
        if req.ProveSinceTx <= vTxID {
2,058✔
845
                sourceTxHdr = rootTxHdr
1,026✔
846
                targetTxHdr = tx.Header()
1,026✔
847
        } else {
1,032✔
848
                sourceTxHdr = tx.Header()
6✔
849
                targetTxHdr = rootTxHdr
6✔
850
        }
6✔
851

852
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,032✔
853
        if err != nil {
1,032✔
854
                return nil, err
×
855
        }
×
856

857
        verifiableTx := &schema.VerifiableTx{
1,032✔
858
                Tx:        schema.TxToProto(tx),
1,032✔
859
                DualProof: schema.DualProofToProto(dualProof),
1,032✔
860
        }
1,032✔
861

1,032✔
862
        return &schema.VerifiableEntry{
1,032✔
863
                Entry:          e,
1,032✔
864
                VerifiableTx:   verifiableTx,
1,032✔
865
                InclusionProof: schema.InclusionProofToProto(inclusionProof),
1,032✔
866
        }, nil
1,032✔
867
}
868

869
func (d *db) Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error) {
9✔
870
        if req == nil {
10✔
871
                return nil, ErrIllegalArguments
1✔
872
        }
1✔
873

874
        d.mutex.RLock()
8✔
875
        defer d.mutex.RUnlock()
8✔
876

8✔
877
        if d.isReplica() {
8✔
878
                return nil, ErrIsReplica
×
879
        }
×
880

881
        opts := store.DefaultTxOptions()
8✔
882

8✔
883
        if req.SinceTx > 0 {
9✔
884
                if req.SinceTx > d.st.LastPrecommittedTxID() {
2✔
885
                        return nil, store.ErrTxNotFound
1✔
886
                }
1✔
887

888
                opts.WithSnapshotMustIncludeTxID(func(_ uint64) uint64 {
×
889
                        return req.SinceTx
×
890
                })
×
891
        }
892

893
        tx, err := d.newTx(ctx, opts)
7✔
894
        if err != nil {
7✔
895
                return nil, err
×
896
        }
×
897
        defer tx.Cancel()
7✔
898

7✔
899
        for _, k := range req.Keys {
13✔
900
                if len(k) == 0 {
6✔
901
                        return nil, ErrIllegalArguments
×
902
                }
×
903

904
                md := store.NewKVMetadata()
6✔
905

6✔
906
                md.AsDeleted(true)
6✔
907

6✔
908
                e := EncodeEntrySpec(k, md, nil)
6✔
909

6✔
910
                err = tx.Delete(ctx, e.Key)
6✔
911
                if err != nil {
7✔
912
                        return nil, err
1✔
913
                }
1✔
914
        }
915

916
        var hdr *store.TxHeader
6✔
917
        if req.NoWait {
6✔
918
                hdr, err = tx.AsyncCommit(ctx)
×
919
        } else {
6✔
920
                hdr, err = tx.Commit(ctx)
6✔
921
        }
6✔
922
        if err != nil {
7✔
923
                return nil, err
1✔
924
        }
1✔
925

926
        return schema.TxHeaderToProto(hdr), nil
5✔
927
}
928

929
// GetAll ...
930
func (d *db) GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error) {
4✔
931
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
4✔
932
        if err != nil {
4✔
933
                return nil, err
×
934
        }
×
935
        defer snap.Close()
4✔
936

4✔
937
        list := &schema.Entries{}
4✔
938

4✔
939
        for _, key := range req.Keys {
14✔
940
                e, err := d.get(ctx, EncodeKey(key), snap, true)
10✔
941
                if err == nil || errors.Is(err, store.ErrKeyNotFound) {
20✔
942
                        if e != nil {
19✔
943
                                list.Entries = append(list.Entries, e)
9✔
944
                        }
9✔
945
                } else {
×
946
                        return nil, err
×
947
                }
×
948
        }
949

950
        return list, nil
4✔
951
}
952

953
func (d *db) Size() (uint64, error) {
107✔
954
        return d.st.Size()
107✔
955
}
107✔
956

957
// TxCount ...
958
func (d *db) TxCount() (uint64, error) {
333✔
959
        return d.st.TxCount(), nil
333✔
960
}
333✔
961

962
// Count ...
963
func (d *db) Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error) {
8✔
964
        if prefix == nil {
10✔
965
                return nil, ErrIllegalArguments
2✔
966
        }
2✔
967

968
        tx, err := d.st.NewTx(ctx, store.DefaultTxOptions().WithMode(store.ReadOnlyTx))
6✔
969
        if err != nil {
6✔
970
                return nil, err
×
971
        }
×
972
        defer tx.Cancel()
6✔
973

6✔
974
        keyReader, err := tx.NewKeyReader(store.KeyReaderSpec{
6✔
975
                Prefix: WrapWithPrefix(prefix.Prefix, SetKeyPrefix),
6✔
976
        })
6✔
977
        if err != nil {
6✔
978
                return nil, err
×
979
        }
×
980
        defer keyReader.Close()
6✔
981

6✔
982
        count := 0
6✔
983

6✔
984
        for {
19✔
985
                _, _, err := keyReader.Read(ctx)
13✔
986
                if errors.Is(err, store.ErrNoMoreEntries) {
19✔
987
                        break
6✔
988
                }
989
                if err != nil {
7✔
990
                        return nil, err
×
991
                }
×
992

993
                count++
7✔
994
        }
995

996
        return &schema.EntryCount{Count: uint64(count)}, nil
6✔
997
}
998

999
// CountAll ...
1000
func (d *db) CountAll(ctx context.Context) (*schema.EntryCount, error) {
4✔
1001
        return d.Count(ctx, &schema.KeyPrefix{})
4✔
1002
}
4✔
1003

1004
// TxByID ...
1005
func (d *db) TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error) {
62✔
1006
        if req == nil {
63✔
1007
                return nil, ErrIllegalArguments
1✔
1008
        }
1✔
1009

1010
        var snap *store.Snapshot
61✔
1011
        var err error
61✔
1012

61✔
1013
        tx, err := d.allocTx()
61✔
1014
        if err != nil {
61✔
1015
                return nil, err
×
1016
        }
×
1017
        defer d.releaseTx(tx)
61✔
1018

61✔
1019
        if !req.KeepReferencesUnresolved {
120✔
1020
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
59✔
1021
                if err != nil {
59✔
1022
                        return nil, err
×
1023
                }
×
1024
                defer snap.Close()
59✔
1025
        }
1026

1027
        // key-value inclusion proof
1028
        err = d.st.ReadTx(req.Tx, false, tx)
61✔
1029
        if err != nil {
61✔
1030
                return nil, err
×
1031
        }
×
1032

1033
        return d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
61✔
1034
}
1035

1036
func (d *db) snapshotSince(ctx context.Context, prefix []byte, txID uint64) (*store.Snapshot, error) {
1,709✔
1037
        currTxID, _ := d.st.CommittedAlh()
1,709✔
1038

1,709✔
1039
        if txID > currTxID {
1,709✔
1040
                return nil, ErrIllegalArguments
×
1041
        }
×
1042

1043
        waitUntilTx := txID
1,709✔
1044
        if waitUntilTx == 0 {
3,352✔
1045
                waitUntilTx = currTxID
1,643✔
1046
        }
1,643✔
1047

1048
        return d.st.SnapshotMustIncludeTxID(ctx, prefix, waitUntilTx)
1,709✔
1049
}
1050

1051
func (d *db) serializeTx(ctx context.Context, tx *store.Tx, spec *schema.EntriesSpec, snap *store.Snapshot, skipIntegrityCheck bool) (*schema.Tx, error) {
1,376✔
1052
        if spec == nil {
2,242✔
1053
                return schema.TxToProto(tx), nil
866✔
1054
        }
866✔
1055

1056
        stx := &schema.Tx{
510✔
1057
                Header: schema.TxHeaderToProto(tx.Header()),
510✔
1058
        }
510✔
1059

510✔
1060
        for _, e := range tx.Entries() {
1,048✔
1061
                switch e.Key()[0] {
538✔
1062
                case SetKeyPrefix:
39✔
1063
                        {
78✔
1064
                                if spec.KvEntriesSpec == nil || spec.KvEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
57✔
1065
                                        break
18✔
1066
                                }
1067

1068
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
24✔
1069
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
3✔
1070
                                        break
3✔
1071
                                }
1072

1073
                                v, err := d.st.ReadValue(e)
18✔
1074
                                if errors.Is(err, store.ErrExpiredEntry) {
18✔
1075
                                        break
×
1076
                                }
1077
                                if err != nil {
18✔
1078
                                        return nil, err
×
1079
                                }
×
1080

1081
                                if spec.KvEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
27✔
1082
                                        kve := schema.TxEntryToProto(e)
9✔
1083
                                        kve.Value = v
9✔
1084
                                        stx.Entries = append(stx.Entries, kve)
9✔
1085
                                        break
9✔
1086
                                }
1087

1088
                                // resolve entry
1089
                                var index store.KeyIndex
9✔
1090
                                if snap != nil {
16✔
1091
                                        index = snap
7✔
1092
                                }
7✔
1093

1094
                                kve, err := d.resolveValue(ctx, e.Key(), v, 0, tx.Header().ID, e.Metadata(), index, 0, skipIntegrityCheck)
9✔
1095
                                if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
10✔
1096
                                        break // ignore deleted ones (referenced key may have been deleted)
1✔
1097
                                }
1098
                                if err != nil {
8✔
1099
                                        return nil, err
×
1100
                                }
×
1101

1102
                                stx.KvEntries = append(stx.KvEntries, kve)
8✔
1103
                        }
1104
                case SortedSetKeyPrefix:
14✔
1105
                        {
28✔
1106
                                if spec.ZEntriesSpec == nil || spec.ZEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
22✔
1107
                                        break
8✔
1108
                                }
1109

1110
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
7✔
1111
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1112
                                        break
1✔
1113
                                }
1114

1115
                                if spec.ZEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
6✔
1116
                                        v, err := d.st.ReadValue(e)
1✔
1117
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1118
                                                break
×
1119
                                        }
1120
                                        if err != nil {
1✔
1121
                                                return nil, err
×
1122
                                        }
×
1123

1124
                                        kve := schema.TxEntryToProto(e)
1✔
1125
                                        kve.Value = v
1✔
1126
                                        stx.Entries = append(stx.Entries, kve)
1✔
1127
                                        break
1✔
1128
                                }
1129

1130
                                // zKey = [1+setLenLen+set+scoreLen+keyLenLen+1+key+txIDLen]
1131
                                zKey := e.Key()
4✔
1132

4✔
1133
                                setLen := int(binary.BigEndian.Uint64(zKey[1:]))
4✔
1134
                                set := make([]byte, setLen)
4✔
1135
                                copy(set, zKey[1+setLenLen:])
4✔
1136

4✔
1137
                                scoreOff := 1 + setLenLen + setLen
4✔
1138
                                scoreB := binary.BigEndian.Uint64(zKey[scoreOff:])
4✔
1139
                                score := math.Float64frombits(scoreB)
4✔
1140

4✔
1141
                                keyOff := scoreOff + scoreLen + keyLenLen
4✔
1142
                                key := make([]byte, len(zKey)-keyOff-txIDLen)
4✔
1143
                                copy(key, zKey[keyOff:])
4✔
1144

4✔
1145
                                atTx := binary.BigEndian.Uint64(zKey[keyOff+len(key):])
4✔
1146

4✔
1147
                                var entry *schema.Entry
4✔
1148
                                var err error
4✔
1149

4✔
1150
                                if snap != nil {
7✔
1151
                                        entry, err = d.getAtTx(ctx, key, atTx, 1, snap, 0, skipIntegrityCheck)
3✔
1152
                                        if errors.Is(err, store.ErrKeyNotFound) || errors.Is(err, store.ErrExpiredEntry) {
3✔
1153
                                                break // ignore deleted ones (referenced key may have been deleted)
×
1154
                                        }
1155
                                        if err != nil {
3✔
1156
                                                return nil, err
×
1157
                                        }
×
1158
                                }
1159

1160
                                zentry := &schema.ZEntry{
4✔
1161
                                        Set:   set,
4✔
1162
                                        Key:   key[1:],
4✔
1163
                                        Entry: entry,
4✔
1164
                                        Score: score,
4✔
1165
                                        AtTx:  atTx,
4✔
1166
                                }
4✔
1167

4✔
1168
                                stx.ZEntries = append(stx.ZEntries, zentry)
4✔
1169
                        }
1170
                case SQLPrefix:
5✔
1171
                        {
10✔
1172
                                if spec.SqlEntriesSpec == nil || spec.SqlEntriesSpec.Action == schema.EntryTypeAction_EXCLUDE {
7✔
1173
                                        break
2✔
1174
                                }
1175

1176
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_ONLY_DIGEST {
4✔
1177
                                        stx.Entries = append(stx.Entries, schema.TxEntryToProto(e))
1✔
1178
                                        break
1✔
1179
                                }
1180

1181
                                if spec.SqlEntriesSpec.Action == schema.EntryTypeAction_RAW_VALUE {
3✔
1182
                                        v, err := d.st.ReadValue(e)
1✔
1183
                                        if errors.Is(err, store.ErrExpiredEntry) {
1✔
1184
                                                break
×
1185
                                        }
1186
                                        if err != nil {
1✔
1187
                                                return nil, err
×
1188
                                        }
×
1189

1190
                                        kve := schema.TxEntryToProto(e)
1✔
1191
                                        kve.Value = v
1✔
1192
                                        stx.Entries = append(stx.Entries, kve)
1✔
1193
                                        break
1✔
1194
                                }
1195

1196
                                return nil, fmt.Errorf("%w: sql entry resolution is not supported", ErrIllegalArguments)
1✔
1197
                        }
1198
                }
1199
        }
1200

1201
        return stx, nil
509✔
1202
}
1203

1204
func (d *db) mayUpdateReplicaState(committedTxID uint64, newReplicaState *schema.ReplicaState) error {
6,666✔
1205
        d.replicaStatesMutex.Lock()
6,666✔
1206
        defer d.replicaStatesMutex.Unlock()
6,666✔
1207

6,666✔
1208
        // clean up replicaStates
6,666✔
1209
        // it's safe to remove up to latest tx committed in primary
6,666✔
1210
        for uuid, st := range d.replicaStates {
7,101✔
1211
                if st.precommittedTxID <= committedTxID {
745✔
1212
                        delete(d.replicaStates, uuid)
310✔
1213
                }
310✔
1214
        }
1215

1216
        if newReplicaState.PrecommittedTxID <= committedTxID {
12,978✔
1217
                // as far as the primary is concerned, nothing really new has happened
6,312✔
1218
                return nil
6,312✔
1219
        }
6,312✔
1220

1221
        newReplicaAlh := schema.DigestFromProto(newReplicaState.PrecommittedAlh)
354✔
1222

354✔
1223
        replicaSt, ok := d.replicaStates[newReplicaState.UUID]
354✔
1224
        if ok {
392✔
1225
                if newReplicaState.PrecommittedTxID < replicaSt.precommittedTxID {
38✔
1226
                        return fmt.Errorf("%w: the newly informed replica state lags behind the previously informed one", ErrIllegalArguments)
×
1227
                }
×
1228

1229
                if newReplicaState.PrecommittedTxID == replicaSt.precommittedTxID {
75✔
1230
                        // as of the last informed replica status update, nothing has changed
37✔
1231
                        return nil
37✔
1232
                }
37✔
1233

1234
                // actual replication progress is informed by the replica
1235
                replicaSt.precommittedTxID = newReplicaState.PrecommittedTxID
1✔
1236
                replicaSt.precommittedAlh = newReplicaAlh
1✔
1237
        } else {
316✔
1238
                // replica informs first replication state
316✔
1239
                d.replicaStates[newReplicaState.UUID] = &replicaState{
316✔
1240
                        precommittedTxID: newReplicaState.PrecommittedTxID,
316✔
1241
                        precommittedAlh:  newReplicaAlh,
316✔
1242
                }
316✔
1243
        }
316✔
1244

1245
        // check up to which tx enough replicas ack replication and it's safe to commit
1246
        mayCommitUpToTxID := uint64(0)
317✔
1247
        if len(d.replicaStates) > 0 {
634✔
1248
                mayCommitUpToTxID = math.MaxUint64
317✔
1249
        }
317✔
1250

1251
        allowances := 0
317✔
1252

317✔
1253
        // we may clean up replicaStates from those who are lagging behind commit
317✔
1254
        for _, st := range d.replicaStates {
716✔
1255
                if st.precommittedTxID < mayCommitUpToTxID {
716✔
1256
                        mayCommitUpToTxID = st.precommittedTxID
317✔
1257
                }
317✔
1258
                allowances++
399✔
1259
        }
1260

1261
        if allowances >= d.options.syncAcks {
603✔
1262
                err := d.st.AllowCommitUpto(mayCommitUpToTxID)
286✔
1263
                if err != nil {
286✔
1264
                        return err
×
1265
                }
×
1266
        }
1267

1268
        return nil
317✔
1269
}
1270

1271
func (d *db) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error) {
8,378✔
1272
        if req == nil {
8,378✔
1273
                return nil, 0, mayCommitUpToAlh, ErrIllegalArguments
×
1274
        }
×
1275

1276
        if d.replicaStates == nil && req.ReplicaState != nil {
8,378✔
1277
                return nil, 0, mayCommitUpToAlh, fmt.Errorf("%w: replica state was NOT expected", ErrIllegalState)
×
1278
        }
×
1279

1280
        tx, err := d.allocTx()
8,378✔
1281
        if err != nil {
8,378✔
1282
                return nil, 0, mayCommitUpToAlh, err
×
1283
        }
×
1284
        defer d.releaseTx(tx)
8,378✔
1285

8,378✔
1286
        committedTxID, committedAlh := d.st.CommittedAlh()
8,378✔
1287
        preCommittedTxID, _ := d.st.PrecommittedAlh()
8,378✔
1288

8,378✔
1289
        if req.ReplicaState != nil {
15,046✔
1290
                if req.ReplicaState.CommittedTxID > 0 {
12,806✔
1291
                        // validate replica commit state
6,138✔
1292
                        if req.ReplicaState.CommittedTxID > committedTxID {
6,138✔
1293
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1294
                        }
×
1295

1296
                        // integrityCheck is currently required to validate Alh
1297
                        expectedReplicaCommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.CommittedTxID, false, false)
6,138✔
1298
                        if err != nil {
6,138✔
1299
                                return nil, committedTxID, committedAlh, err
×
1300
                        }
×
1301

1302
                        replicaCommittedAlh := schema.DigestFromProto(req.ReplicaState.CommittedAlh)
6,138✔
1303

6,138✔
1304
                        if expectedReplicaCommitHdr.Alh() != replicaCommittedAlh {
6,138✔
1305
                                return nil, expectedReplicaCommitHdr.ID, expectedReplicaCommitHdr.Alh(), fmt.Errorf("%w: replica commit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1306
                        }
×
1307
                }
1308

1309
                if req.ReplicaState.PrecommittedTxID > 0 {
12,989✔
1310
                        // validate replica precommit state
6,321✔
1311
                        if req.ReplicaState.PrecommittedTxID > preCommittedTxID {
6,323✔
1312
                                return nil, committedTxID, committedAlh, fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
2✔
1313
                        }
2✔
1314

1315
                        // integrityCheck is currently required to validate Alh
1316
                        expectedReplicaPrecommitHdr, err := d.st.ReadTxHeader(req.ReplicaState.PrecommittedTxID, true, false)
6,319✔
1317
                        if err != nil {
6,319✔
1318
                                return nil, committedTxID, committedAlh, err
×
1319
                        }
×
1320

1321
                        replicaPreCommittedAlh := schema.DigestFromProto(req.ReplicaState.PrecommittedAlh)
6,319✔
1322

6,319✔
1323
                        if expectedReplicaPrecommitHdr.Alh() != replicaPreCommittedAlh {
6,319✔
1324
                                return nil, expectedReplicaPrecommitHdr.ID, expectedReplicaPrecommitHdr.Alh(), fmt.Errorf("%w: replica precommit state diverged from primary's", ErrReplicaDivergedFromPrimary)
×
1325
                        }
×
1326

1327
                        // primary will provide commit state to the replica so it can commit pre-committed transactions
1328
                        if req.ReplicaState.PrecommittedTxID < committedTxID {
7,674✔
1329
                                // if replica is behind current commit state in primary
1,355✔
1330
                                // return the alh up to the point known by the replica.
1,355✔
1331
                                // That way the replica is able to validate is following the right primary.
1,355✔
1332
                                mayCommitUpToTxID = req.ReplicaState.PrecommittedTxID
1,355✔
1333
                                mayCommitUpToAlh = replicaPreCommittedAlh
1,355✔
1334
                        } else {
6,319✔
1335
                                mayCommitUpToTxID = committedTxID
4,964✔
1336
                                mayCommitUpToAlh = committedAlh
4,964✔
1337
                        }
4,964✔
1338
                }
1339

1340
                err = d.mayUpdateReplicaState(committedTxID, req.ReplicaState)
6,666✔
1341
                if err != nil {
6,666✔
1342
                        return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1343
                }
×
1344
        }
1345

1346
        // it might be the case primary will commit some txs (even there could be inmem-precommitted txs)
1347
        // current timeout it's not a special value but at least a relative one
1348
        // note: primary might also be waiting ack from any replica (even this primary may do progress)
1349

1350
        // TODO: under some circumstances, replica might not be able to do further progress until primary
1351
        // has made changes, such wait doesn't need to have a timeout, reducing networking and CPU utilization
1352
        var cancel context.CancelFunc
8,376✔
1353

8,376✔
1354
        if req.ReplicaState != nil {
15,042✔
1355
                ctx, cancel = context.WithTimeout(ctx, d.options.storeOpts.SyncFrequency*4)
6,666✔
1356
                defer cancel()
6,666✔
1357
        }
6,666✔
1358

1359
        err = d.WaitForTx(ctx, req.Tx, req.AllowPreCommitted)
8,376✔
1360
        if ctx.Err() != nil {
8,987✔
1361
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, nil
611✔
1362
        }
611✔
1363
        if err != nil {
7,765✔
1364
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1365
        }
×
1366

1367
        txbs, err = d.st.ExportTx(req.Tx, req.AllowPreCommitted, req.SkipIntegrityCheck, tx)
7,765✔
1368
        if err != nil {
7,765✔
1369
                return nil, mayCommitUpToTxID, mayCommitUpToAlh, err
×
1370
        }
×
1371

1372
        return txbs, mayCommitUpToTxID, mayCommitUpToAlh, nil
7,765✔
1373
}
1374

1375
func (d *db) ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error) {
7,694✔
1376
        d.mutex.RLock()
7,694✔
1377
        defer d.mutex.RUnlock()
7,694✔
1378

7,694✔
1379
        if !d.isReplica() {
7,694✔
1380
                return nil, ErrNotReplica
×
1381
        }
×
1382

1383
        hdr, err := d.st.ReplicateTx(ctx, exportedTx, skipIntegrityCheck, waitForIndexing)
7,694✔
1384
        if err != nil {
7,723✔
1385
                return nil, err
29✔
1386
        }
29✔
1387

1388
        return schema.TxHeaderToProto(hdr), nil
7,665✔
1389
}
1390

1391
// AllowCommitUpto is used by replicas to commit transactions once committed in primary
1392
func (d *db) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error {
6,027✔
1393
        d.mutex.RLock()
6,027✔
1394
        defer d.mutex.RUnlock()
6,027✔
1395

6,027✔
1396
        if !d.isReplica() {
6,027✔
1397
                return ErrNotReplica
×
1398
        }
×
1399

1400
        // replica pre-committed state must be consistent with primary
1401

1402
        committedTxID, committedAlh := d.st.CommittedAlh()
6,027✔
1403
        // handling a particular case in an optimized manner
6,027✔
1404
        if committedTxID == txID {
6,329✔
1405
                if committedAlh != alh {
302✔
1406
                        return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1407
                }
×
1408
                return nil
302✔
1409
        }
1410

1411
        hdr, err := d.st.ReadTxHeader(txID, true, false)
5,725✔
1412
        if err != nil {
5,725✔
1413
                return err
×
1414
        }
×
1415

1416
        if hdr.Alh() != alh {
5,725✔
1417
                return fmt.Errorf("%w: replica commit state diverged from primary's", ErrIllegalState)
×
1418
        }
×
1419

1420
        return d.st.AllowCommitUpto(txID)
5,725✔
1421
}
1422

1423
func (d *db) DiscardPrecommittedTxsSince(txID uint64) error {
2✔
1424
        d.mutex.RLock()
2✔
1425
        defer d.mutex.RUnlock()
2✔
1426

2✔
1427
        _, err := d.st.DiscardPrecommittedTxsSince(txID)
2✔
1428

2✔
1429
        return err
2✔
1430
}
2✔
1431

1432
// VerifiableTxByID ...
1433
func (d *db) VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error) {
1,298✔
1434
        if req == nil {
1,299✔
1435
                return nil, ErrIllegalArguments
1✔
1436
        }
1✔
1437

1438
        lastTxID, _ := d.st.CommittedAlh()
1,297✔
1439
        if lastTxID < req.ProveSinceTx {
1,297✔
1440
                return nil, fmt.Errorf("%w: latest txID=%d is lower than specified as initial tx=%d", ErrIllegalState, lastTxID, req.ProveSinceTx)
×
1441
        }
×
1442

1443
        var snap *store.Snapshot
1,297✔
1444
        var err error
1,297✔
1445

1,297✔
1446
        if !req.KeepReferencesUnresolved {
2,594✔
1447
                snap, err = d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
1,297✔
1448
                if err != nil {
1,297✔
1449
                        return nil, err
×
1450
                }
×
1451
                defer snap.Close()
1,297✔
1452
        }
1453

1454
        reqTx, err := d.allocTx()
1,297✔
1455
        if err != nil {
1,297✔
1456
                return nil, err
×
1457
        }
×
1458
        defer d.releaseTx(reqTx)
1,297✔
1459

1,297✔
1460
        err = d.st.ReadTx(req.Tx, false, reqTx)
1,297✔
1461
        if err != nil {
1,297✔
1462
                return nil, err
×
1463
        }
×
1464

1465
        var sourceTxHdr, targetTxHdr *store.TxHeader
1,297✔
1466
        var rootTxHdr *store.TxHeader
1,297✔
1467

1,297✔
1468
        if req.ProveSinceTx == 0 {
1,301✔
1469
                rootTxHdr = reqTx.Header()
4✔
1470
        } else {
1,297✔
1471
                rootTxHdr, err = d.st.ReadTxHeader(req.ProveSinceTx, false, false)
1,293✔
1472
                if err != nil {
1,293✔
1473
                        return nil, err
×
1474
                }
×
1475
        }
1476

1477
        if req.ProveSinceTx <= req.Tx {
2,593✔
1478
                sourceTxHdr = rootTxHdr
1,296✔
1479
                targetTxHdr = reqTx.Header()
1,296✔
1480
        } else {
1,297✔
1481
                sourceTxHdr = reqTx.Header()
1✔
1482
                targetTxHdr = rootTxHdr
1✔
1483
        }
1✔
1484

1485
        dualProof, err := d.st.DualProof(sourceTxHdr, targetTxHdr)
1,297✔
1486
        if err != nil {
1,297✔
1487
                return nil, err
×
1488
        }
×
1489

1490
        sReqTx, err := d.serializeTx(ctx, reqTx, req.EntriesSpec, snap, true)
1,297✔
1491
        if err != nil {
1,297✔
1492
                return nil, err
×
1493
        }
×
1494

1495
        return &schema.VerifiableTx{
1,297✔
1496
                Tx:        sReqTx,
1,297✔
1497
                DualProof: schema.DualProofToProto(dualProof),
1,297✔
1498
        }, nil
1,297✔
1499
}
1500

1501
// TxScan ...
1502
func (d *db) TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error) {
10✔
1503
        if req == nil {
11✔
1504
                return nil, ErrIllegalArguments
1✔
1505
        }
1✔
1506

1507
        if int(req.Limit) > d.maxResultSize {
10✔
1508
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1509
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1510
        }
1✔
1511

1512
        tx, err := d.allocTx()
8✔
1513
        if err != nil {
8✔
1514
                return nil, err
×
1515
        }
×
1516
        defer d.releaseTx(tx)
8✔
1517

8✔
1518
        limit := int(req.Limit)
8✔
1519
        if req.Limit == 0 {
13✔
1520
                limit = d.maxResultSize
5✔
1521
        }
5✔
1522

1523
        snap, err := d.snapshotSince(ctx, []byte{SetKeyPrefix}, req.SinceTx)
8✔
1524
        if err != nil {
8✔
1525
                return nil, err
×
1526
        }
×
1527
        defer snap.Close()
8✔
1528

8✔
1529
        txReader, err := d.st.NewTxReader(req.InitialTx, req.Desc, tx)
8✔
1530
        if err != nil {
9✔
1531
                return nil, err
1✔
1532
        }
1✔
1533

1534
        txList := &schema.TxList{}
7✔
1535

7✔
1536
        for l := 1; l <= limit; l++ {
27✔
1537
                tx, err := txReader.Read()
20✔
1538
                if errors.Is(err, store.ErrNoMoreEntries) {
22✔
1539
                        break
2✔
1540
                }
1541
                if err != nil {
18✔
1542
                        return nil, err
×
1543
                }
×
1544

1545
                sTx, err := d.serializeTx(ctx, tx, req.EntriesSpec, snap, true)
18✔
1546
                if err != nil {
18✔
1547
                        return nil, err
×
1548
                }
×
1549

1550
                txList.Txs = append(txList.Txs, sTx)
18✔
1551
        }
1552

1553
        return txList, nil
7✔
1554
}
1555

1556
// History ...
1557
func (d *db) History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error) {
9✔
1558
        if req == nil {
10✔
1559
                return nil, ErrIllegalArguments
1✔
1560
        }
1✔
1561

1562
        if int(req.Limit) > d.maxResultSize {
9✔
1563
                return nil, fmt.Errorf("%w: the specified limit (%d) is larger than the maximum allowed one (%d)",
1✔
1564
                        ErrResultSizeLimitExceeded, req.Limit, d.maxResultSize)
1✔
1565
        }
1✔
1566

1567
        currTxID, _ := d.st.CommittedAlh()
7✔
1568

7✔
1569
        if req.SinceTx > currTxID {
7✔
1570
                return nil, ErrIllegalArguments
×
1571
        }
×
1572

1573
        waitUntilTx := req.SinceTx
7✔
1574
        if waitUntilTx == 0 {
9✔
1575
                waitUntilTx = currTxID
2✔
1576
        }
2✔
1577

1578
        err := d.WaitForIndexingUpto(ctx, waitUntilTx)
7✔
1579
        if err != nil {
8✔
1580
                return nil, err
1✔
1581
        }
1✔
1582

1583
        limit := int(req.Limit)
6✔
1584
        if limit == 0 {
12✔
1585
                limit = d.maxResultSize
6✔
1586
        }
6✔
1587

1588
        key := EncodeKey(req.Key)
6✔
1589

6✔
1590
        valRefs, _, err := d.st.History(key, req.Offset, req.Desc, limit)
6✔
1591
        if err != nil && err != store.ErrOffsetOutOfRange {
6✔
1592
                return nil, err
×
1593
        }
×
1594

1595
        list := &schema.Entries{
6✔
1596
                Entries: make([]*schema.Entry, len(valRefs)),
6✔
1597
        }
6✔
1598

6✔
1599
        for i, valRef := range valRefs {
114✔
1600
                val, err := valRef.Resolve()
108✔
1601
                if err != nil && err != store.ErrExpiredEntry {
108✔
1602
                        return nil, err
×
1603
                }
×
1604
                if len(val) > 0 {
214✔
1605
                        val = TrimPrefix(val)
106✔
1606
                }
106✔
1607

1608
                list.Entries[i] = &schema.Entry{
108✔
1609
                        Tx:       valRef.Tx(),
108✔
1610
                        Key:      req.Key,
108✔
1611
                        Metadata: schema.KVMetadataToProto(valRef.KVMetadata()),
108✔
1612
                        Value:    val,
108✔
1613
                        Expired:  errors.Is(err, store.ErrExpiredEntry),
108✔
1614
                        Revision: valRef.HC(),
108✔
1615
                }
108✔
1616
        }
1617
        return list, nil
6✔
1618
}
1619

1620
func (d *db) IsClosed() bool {
204✔
1621
        d.closingMutex.Lock()
204✔
1622
        defer d.closingMutex.Unlock()
204✔
1623

204✔
1624
        return d.st.IsClosed()
204✔
1625
}
204✔
1626

1627
// Close ...
1628
func (d *db) Close() (err error) {
719✔
1629
        d.closingMutex.Lock()
719✔
1630
        defer d.closingMutex.Unlock()
719✔
1631

719✔
1632
        d.Logger.Infof("closing database '%s'...", d.name)
719✔
1633

719✔
1634
        defer func() {
1,438✔
1635
                if err == nil {
1,344✔
1636
                        d.Logger.Infof("database '%s' successfully closed", d.name)
625✔
1637
                } else {
719✔
1638
                        d.Logger.Infof("%v: while closing database '%s'", err, d.name)
94✔
1639
                }
94✔
1640
        }()
1641

1642
        return d.st.Close()
719✔
1643
}
1644

1645
// GetName ...
1646
func (d *db) GetName() string {
54,775✔
1647
        return d.name
54,775✔
1648
}
54,775✔
1649

1650
// GetOptions ...
1651
func (d *db) GetOptions() *Options {
3,082✔
1652
        d.mutex.RLock()
3,082✔
1653
        defer d.mutex.RUnlock()
3,082✔
1654

3,082✔
1655
        return d.options
3,082✔
1656
}
3,082✔
1657

1658
func (d *db) AsReplica(asReplica, syncReplication bool, syncAcks int) {
15✔
1659
        d.mutex.Lock()
15✔
1660
        defer d.mutex.Unlock()
15✔
1661

15✔
1662
        d.replicaStatesMutex.Lock()
15✔
1663
        defer d.replicaStatesMutex.Unlock()
15✔
1664

15✔
1665
        d.options.replica = asReplica
15✔
1666
        d.options.syncAcks = syncAcks
15✔
1667
        d.options.syncReplication = syncReplication
15✔
1668

15✔
1669
        if asReplica {
23✔
1670
                d.replicaStates = nil
8✔
1671
        } else if syncAcks > 0 {
18✔
1672
                d.replicaStates = make(map[string]*replicaState, syncAcks)
3✔
1673
        }
3✔
1674

1675
        d.st.SetExternalCommitAllowance(syncReplication)
15✔
1676
}
1677

1678
func (d *db) IsReplica() bool {
446✔
1679
        d.mutex.RLock()
446✔
1680
        defer d.mutex.RUnlock()
446✔
1681

446✔
1682
        return d.isReplica()
446✔
1683
}
446✔
1684

1685
func (d *db) isReplica() bool {
19,450✔
1686
        return d.options.replica
19,450✔
1687
}
19,450✔
1688

1689
func (d *db) IsSyncReplicationEnabled() bool {
8,376✔
1690
        d.mutex.RLock()
8,376✔
1691
        defer d.mutex.RUnlock()
8,376✔
1692

8,376✔
1693
        return d.options.syncReplication
8,376✔
1694
}
8,376✔
1695

1696
func (d *db) SetSyncReplication(enabled bool) {
216✔
1697
        d.mutex.Lock()
216✔
1698
        defer d.mutex.Unlock()
216✔
1699

216✔
1700
        d.st.SetExternalCommitAllowance(enabled)
216✔
1701

216✔
1702
        d.options.syncReplication = enabled
216✔
1703
}
216✔
1704

1705
func logErr(log logger.Logger, formattedMessage string, err error) error {
1✔
1706
        if err != nil {
2✔
1707
                log.Errorf(formattedMessage, err)
1✔
1708
        }
1✔
1709
        return err
1✔
1710
}
1711

1712
// CopyCatalog creates a copy of the sql catalog and returns a transaction
1713
// that can be used to commit the copy.
1714
func (d *db) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error {
13✔
1715
        // copy the sql catalog
13✔
1716
        err := d.sqlEngine.CopyCatalogToTx(ctx, tx)
13✔
1717
        if err != nil {
13✔
1718
                return err
×
1719
        }
×
1720

1721
        // copy the document store catalog
1722
        err = d.documentEngine.CopyCatalogToTx(ctx, tx)
13✔
1723
        if err != nil {
13✔
1724
                return err
×
1725
        }
×
1726

1727
        return nil
13✔
1728
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc